From feb7f45533d7a195b2ced5a50be359ac8549151b Mon Sep 17 00:00:00 2001 From: normalizedwater546 Date: Sat, 23 Nov 2024 12:19:36 +0000 Subject: [PATCH] fix: semaphore bound to different event loop --- nhentai/downloader.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/nhentai/downloader.py b/nhentai/downloader.py index c28e944..9e3b21d 100644 --- a/nhentai/downloader.py +++ b/nhentai/downloader.py @@ -13,6 +13,7 @@ import httpx urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + class NHentaiImageNotExistException(Exception): pass @@ -32,16 +33,24 @@ def download_callback(result): logger.log(16, f'{data} downloaded successfully') +async def fiber(tasks): + for completed_task in asyncio.as_completed(tasks): + try: + result = await completed_task + logger.info(f'{result[1]} download completed') + except Exception as e: + logger.error(f'An error occurred: {e}') + + class Downloader(Singleton): def __init__(self, path='', threads=5, timeout=30, delay=0): - self.semaphore = asyncio.Semaphore(threads) + self.threads = threads self.path = str(path) self.timeout = timeout self.delay = delay - async def _semaphore_download(self, *args, **kwargs): - # This sets a concurrency limit for AsyncIO - async with self.semaphore: + async def _semaphore_download(self, semaphore, *args, **kwargs): + async with semaphore: return await self.download(*args, **kwargs) async def download(self, url, folder='', filename='', retried=0, proxy=None): @@ -125,7 +134,7 @@ class Downloader(Singleton): def start_download(self, queue, folder='') -> bool: logger.warning("Proxy temporarily unavailable, it will be fixed later. ") - if not isinstance(folder, (str, )): + if not isinstance(folder, (str,)): folder = str(folder) if self.path: @@ -143,19 +152,14 @@ class Downloader(Singleton): # Assuming we want to continue with rest of process. return True - async def fiber(tasks): - for completed_task in asyncio.as_completed(tasks): - try: - result = await completed_task - logger.info(f'{result[1]} download completed') - except Exception as e: - logger.error(f'An error occurred: {e}') + semaphore = asyncio.Semaphore(self.threads) - tasks = [ - self._semaphore_download(url, filename=os.path.basename(urlparse(url).path)) + coroutines = [ + self._semaphore_download(semaphore, url, filename=os.path.basename(urlparse(url).path)) for url in queue ] + # Prevent coroutines infection - asyncio.run(fiber(tasks)) + asyncio.run(fiber(coroutines)) return True