# coding: utf- import os import asyncio import httpx import urllib3.exceptions import zipfile import io from urllib.parse import urlparse from doujinshi_dl.core.logger import logger from doujinshi_dl.core.utils.db import Singleton from doujinshi_dl.core import config as core_config async def _async_request(method, url, timeout=30, proxy=None): """Minimal async HTTP helper using httpx directly.""" # httpx >=0.28 uses `proxy` (str), older versions used `proxies` (dict) client_kwargs = {'verify': False} if proxy: client_kwargs['proxy'] = proxy async with httpx.AsyncClient(**client_kwargs) as client: headers = {} cookie = core_config.get('plugin_config', {}).get('cookie', '') useragent = core_config.get('plugin_config', {}).get('useragent', '') if cookie: headers['Cookie'] = cookie if useragent: headers['User-Agent'] = useragent return await client.request(method, url, timeout=timeout, headers=headers, follow_redirects=True) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def download_callback(result): result, data = result if result == 0: logger.warning('fatal errors occurred, ignored') elif result == -1: logger.warning(f'url {data} return status code 404') elif result == -2: logger.warning('Ctrl-C pressed, exiting sub processes ...') elif result == -3: # workers won't be run, just pass pass else: logger.log(16, f'{data} downloaded successfully') class Downloader(Singleton): def __init__(self, path='', threads=5, timeout=30, delay=0, exit_on_fail=False, no_filename_padding=False): self.threads = threads self.path = str(path) self.timeout = timeout self.delay = delay self.exit_on_fail = exit_on_fail self.folder = None self.semaphore = None self.no_filename_padding = no_filename_padding async def fiber(self, tasks): self.semaphore = asyncio.Semaphore(self.threads) for completed_task in asyncio.as_completed(tasks): try: result = await completed_task if result[0] > 0: logger.info(f'{result[1]} download completed') else: raise Exception(f'{result[1]} download failed, return value {result[0]}') except Exception as e: logger.error(f'An error occurred: {e}') if self.exit_on_fail: raise Exception('User intends to exit on fail') async def _semaphore_download(self, *args, **kwargs): async with self.semaphore: return await self.download(*args, **kwargs) async def download(self, url, folder='', filename='', retried=0, proxy=None, length=0): logger.info(f'Starting to download {url} ...') if self.delay: await asyncio.sleep(self.delay) filename = filename if filename else os.path.basename(urlparse(url).path) base_filename, extension = os.path.splitext(filename) if not self.no_filename_padding: filename = base_filename.zfill(length) + extension else: filename = base_filename + extension try: response = await _async_request('GET', url, timeout=self.timeout, proxy=proxy) if response.status_code != 200: path = urlparse(url).path image_url_mirrors = core_config.get('image_url_mirrors', []) for mirror in image_url_mirrors: logger.info(f"Try mirror: {mirror}{path}") mirror_url = f'{mirror}{path}' response = await _async_request('GET', mirror_url, timeout=self.timeout, proxy=proxy) if response.status_code == 200: break if not await self.save(filename, response): logger.error(f'Can not download image {url}') return -1, url except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e: retry_times = core_config.get('retry_times', 3) if retried < retry_times: logger.warning(f'Download {filename} failed, retrying({retried + 1}) times...') return await self.download( url=url, folder=folder, filename=filename, retried=retried + 1, proxy=proxy, ) else: logger.warning(f'Download {filename} failed with {retry_times} times retried, skipped') return -2, url except Exception as e: import traceback logger.error(f"Exception type: {type(e)}") traceback.print_stack() logger.critical(str(e)) return -9, url except KeyboardInterrupt: return -4, url return 1, url async def save(self, filename, response) -> bool: if response is None: logger.error('Error: Response is None') return False save_file_path = os.path.join(self.folder, filename) with open(save_file_path, 'wb') as f: if response is not None: length = response.headers.get('content-length') if length is None: f.write(response.content) else: async for chunk in response.aiter_bytes(2048): f.write(chunk) return True def create_storage_object(self, folder:str): if not os.path.exists(folder): try: os.makedirs(folder) except EnvironmentError as e: logger.critical(str(e)) self.folder:str = folder self.close = lambda: None # Only available in class CompressedDownloader def start_download(self, queue, folder='') -> bool: if not isinstance(folder, (str,)): folder = str(folder) if self.path: folder = os.path.join(self.path, folder) logger.info(f'Doujinshi will be saved at "{folder}"') self.create_storage_object(folder) if os.getenv('DEBUG', None) == 'NODOWNLOAD': # Assuming we want to continue with rest of process. return True digit_length = len(str(len(queue))) logger.info(f'Total download pages: {len(queue)}') coroutines = [ self._semaphore_download(url, filename=os.path.basename(urlparse(url).path), length=digit_length) for url in queue ] # Prevent coroutines infection asyncio.run(self.fiber(coroutines)) self.close() return True class CompressedDownloader(Downloader): def create_storage_object(self, folder): filename = f'{folder}.zip' print(filename) self.zipfile = zipfile.ZipFile(filename,'w') self.close = lambda: self.zipfile.close() async def save(self, filename, response) -> bool: if response is None: logger.error('Error: Response is None') return False image_data = io.BytesIO() length = response.headers.get('content-length') if length is None: content = await response.read() image_data.write(content) else: async for chunk in response.aiter_bytes(2048): image_data.write(chunk) image_data.seek(0) self.zipfile.writestr(filename, image_data.read()) return True