use multiprocess instead of threadpool #78

This commit is contained in:
RicterZ 2019-07-31 01:22:54 +08:00
parent bc70a2071b
commit 7e826c5255
3 changed files with 59 additions and 21 deletions

View File

@ -11,7 +11,7 @@ from nhentai.doujinshi import Doujinshi
from nhentai.downloader import Downloader from nhentai.downloader import Downloader
from nhentai.logger import logger from nhentai.logger import logger
from nhentai.constant import BASE_URL from nhentai.constant import BASE_URL
from nhentai.utils import generate_html, generate_cbz, generate_main_html, check_cookie from nhentai.utils import generate_html, generate_cbz, generate_main_html, check_cookie, signal_handler
def main(): def main():
@ -83,12 +83,8 @@ def main():
[doujinshi.show() for doujinshi in doujinshi_list] [doujinshi.show() for doujinshi in doujinshi_list]
def signal_handler(signal, frame):
logger.error('Ctrl-C signal received. Stopping...')
exit(1)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,10 +1,14 @@
# coding: utf- # coding: utf-
from __future__ import unicode_literals, print_function from __future__ import unicode_literals, print_function
import signal
from future.builtins import str as text from future.builtins import str as text
import os import os
import requests import requests
import threadpool import threadpool
import time import time
import multiprocessing as mp
try: try:
from urllib.parse import urlparse from urllib.parse import urlparse
@ -13,13 +17,13 @@ except ImportError:
from nhentai.logger import logger from nhentai.logger import logger
from nhentai.parser import request from nhentai.parser import request
from nhentai.utils import Singleton from nhentai.utils import Singleton, signal_handler
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
semaphore = mp.Semaphore()
class NhentaiImageNotExistException(Exception): class NHentaiImageNotExistException(Exception):
pass pass
@ -28,14 +32,14 @@ class Downloader(Singleton):
def __init__(self, path='', thread=1, timeout=30, delay=0): def __init__(self, path='', thread=1, timeout=30, delay=0):
if not isinstance(thread, (int, )) or thread < 1 or thread > 15: if not isinstance(thread, (int, )) or thread < 1 or thread > 15:
raise ValueError('Invalid threads count') raise ValueError('Invalid threads count')
self.path = str(path) self.path = str(path)
self.thread_count = thread self.thread_count = thread
self.threads = [] self.threads = []
self.thread_pool = None
self.timeout = timeout self.timeout = timeout
self.delay = delay self.delay = delay
def _download(self, url, folder='', filename='', retried=0): def download_(self, url, folder='', filename='', retried=0):
if self.delay: if self.delay:
time.sleep(self.delay) time.sleep(self.delay)
logger.info('Starting to download {0} ...'.format(url)) logger.info('Starting to download {0} ...'.format(url))
@ -54,9 +58,9 @@ class Downloader(Singleton):
try: try:
response = request('get', url, stream=True, timeout=self.timeout) response = request('get', url, stream=True, timeout=self.timeout)
if response.status_code != 200: if response.status_code != 200:
raise NhentaiImageNotExistException raise NHentaiImageNotExistException
except NhentaiImageNotExistException as e: except NHentaiImageNotExistException as e:
raise e raise e
except Exception as e: except Exception as e:
@ -78,27 +82,37 @@ class Downloader(Singleton):
except (requests.HTTPError, requests.Timeout) as e: except (requests.HTTPError, requests.Timeout) as e:
if retried < 3: if retried < 3:
logger.warning('Warning: {0}, retrying({1}) ...'.format(str(e), retried)) logger.warning('Warning: {0}, retrying({1}) ...'.format(str(e), retried))
return 0, self._download(url=url, folder=folder, filename=filename, retried=retried+1) return 0, self.download_(url=url, folder=folder, filename=filename, retried=retried+1)
else: else:
return 0, None return 0, None
except NhentaiImageNotExistException as e: except NHentaiImageNotExistException as e:
os.remove(os.path.join(folder, base_filename.zfill(3) + extension)) os.remove(os.path.join(folder, base_filename.zfill(3) + extension))
return -1, url return -1, url
except Exception as e: except Exception as e:
import traceback
traceback.print_stack()
logger.critical(str(e)) logger.critical(str(e))
return 0, None return 0, None
except KeyboardInterrupt:
return -3, None
return 1, url return 1, url
def _download_callback(self, request, result): def _download_callback(self, result):
result, data = result result, data = result
if result == 0: if result == 0:
logger.warning('fatal errors occurred, ignored') logger.warning('fatal errors occurred, ignored')
# exit(1) # exit(1)
elif result == -1: elif result == -1:
logger.warning('url {} return status code 404'.format(data)) logger.warning('url {} return status code 404'.format(data))
elif result == -2:
logger.warning('Ctrl-C pressed, exiting sub processes ...')
elif result == -3:
# workers wont be run, just pass
pass
else: else:
logger.log(15, '{0} downloaded successfully'.format(data)) logger.log(15, '{0} downloaded successfully'.format(data))
@ -119,10 +133,31 @@ class Downloader(Singleton):
else: else:
logger.warn('Path \'{0}\' already exist.'.format(folder)) logger.warn('Path \'{0}\' already exist.'.format(folder))
queue = [([url], {'folder': folder}) for url in queue] queue = [(self, url, folder) for url in queue]
self.thread_pool = threadpool.ThreadPool(self.thread_count) pool = mp.Pool(self.thread_count, init_worker)
requests_ = threadpool.makeRequests(self._download, queue, self._download_callback) for item in queue:
[self.thread_pool.putRequest(req) for req in requests_] pool.apply_async(download_wrapper, args=item, callback=self._download_callback)
self.thread_pool.wait() pool.close()
print(1)
pool.join()
print(2)
def download_wrapper(obj, url, folder=''):
if semaphore.get_value():
return Downloader.download_(obj, url=url, folder=folder)
else:
return -3, None
def init_worker():
signal.signal(signal.SIGINT, subprocess_signal)
def subprocess_signal(signal, frame):
if semaphore.acquire(timeout=1):
logger.warning('Ctrl-C pressed, exiting sub processes ...')
raise KeyboardInterrupt

View File

@ -207,3 +207,10 @@ an invalid filename.
# Remove [] from filename # Remove [] from filename
filename = filename.replace('[]', '') filename = filename.replace('[]', '')
return filename return filename
def signal_handler(signal, frame):
logger.error('Ctrl-C signal received. Stopping...')
exit(1)