fix: semaphore bound to different event loop

This commit is contained in:
normalizedwater546 2024-11-23 12:19:36 +00:00
parent 0754caaeb7
commit feb7f45533

View File

@ -13,6 +13,7 @@ import httpx
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class NHentaiImageNotExistException(Exception): class NHentaiImageNotExistException(Exception):
pass pass
@ -32,16 +33,24 @@ def download_callback(result):
logger.log(16, f'{data} downloaded successfully') logger.log(16, f'{data} downloaded successfully')
async def fiber(tasks):
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
logger.info(f'{result[1]} download completed')
except Exception as e:
logger.error(f'An error occurred: {e}')
class Downloader(Singleton): class Downloader(Singleton):
def __init__(self, path='', threads=5, timeout=30, delay=0): def __init__(self, path='', threads=5, timeout=30, delay=0):
self.semaphore = asyncio.Semaphore(threads) self.threads = threads
self.path = str(path) self.path = str(path)
self.timeout = timeout self.timeout = timeout
self.delay = delay self.delay = delay
async def _semaphore_download(self, *args, **kwargs): async def _semaphore_download(self, semaphore, *args, **kwargs):
# This sets a concurrency limit for AsyncIO async with semaphore:
async with self.semaphore:
return await self.download(*args, **kwargs) return await self.download(*args, **kwargs)
async def download(self, url, folder='', filename='', retried=0, proxy=None): async def download(self, url, folder='', filename='', retried=0, proxy=None):
@ -143,19 +152,14 @@ class Downloader(Singleton):
# Assuming we want to continue with rest of process. # Assuming we want to continue with rest of process.
return True return True
async def fiber(tasks): semaphore = asyncio.Semaphore(self.threads)
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
logger.info(f'{result[1]} download completed')
except Exception as e:
logger.error(f'An error occurred: {e}')
tasks = [ coroutines = [
self._semaphore_download(url, filename=os.path.basename(urlparse(url).path)) self._semaphore_download(semaphore, url, filename=os.path.basename(urlparse(url).path))
for url in queue for url in queue
] ]
# Prevent coroutines infection # Prevent coroutines infection
asyncio.run(fiber(tasks)) asyncio.run(fiber(coroutines))
return True return True