Merge pull request #402 from hzxjy1/zipTest

Close zipfile hander manually and add a test
This commit is contained in:
Ricter Zheng 2025-03-26 22:57:29 +08:00 committed by GitHub
commit 6752edfc9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 7 deletions

View File

@ -139,6 +139,7 @@ class Downloader(Singleton):
except EnvironmentError as e: except EnvironmentError as e:
logger.critical(str(e)) logger.critical(str(e))
self.folder:str = folder self.folder:str = folder
self.close = lambda: None # Only available in class CompressedDownloader
def start_download(self, queue, folder='') -> bool: def start_download(self, queue, folder='') -> bool:
if not isinstance(folder, (str,)): if not isinstance(folder, (str,)):
@ -164,6 +165,8 @@ class Downloader(Singleton):
# Prevent coroutines infection # Prevent coroutines infection
asyncio.run(self.fiber(coroutines)) asyncio.run(self.fiber(coroutines))
self.close()
return True return True
class CompressedDownloader(Downloader): class CompressedDownloader(Downloader):
@ -171,6 +174,7 @@ class CompressedDownloader(Downloader):
filename = f'{folder}.zip' filename = f'{folder}.zip'
print(filename) print(filename)
self.zipfile = zipfile.ZipFile(filename,'w') self.zipfile = zipfile.ZipFile(filename,'w')
self.close = lambda: self.zipfile.close()
async def save(self, filename, response) -> bool: async def save(self, filename, response) -> bool:
if response is None: if response is None:
@ -189,6 +193,3 @@ class CompressedDownloader(Downloader):
image_data.seek(0) image_data.seek(0)
self.zipfile.writestr(filename, image_data.read()) self.zipfile.writestr(filename, image_data.read())
return True return True
def __del__(self):
self.zipfile.close()

View File

@ -1,14 +1,27 @@
import unittest import unittest
import os import os
import zipfile
import urllib3.exceptions import urllib3.exceptions
from nhentai import constant from nhentai import constant
from nhentai.cmdline import load_config from nhentai.cmdline import load_config
from nhentai.downloader import Downloader from nhentai.downloader import Downloader, CompressedDownloader
from nhentai.parser import doujinshi_parser from nhentai.parser import doujinshi_parser
from nhentai.doujinshi import Doujinshi from nhentai.doujinshi import Doujinshi
from nhentai.utils import generate_html from nhentai.utils import generate_html
did = 440546
def has_jepg_file(path):
with zipfile.ZipFile(path, 'r') as zf:
return '01.jpg' in zf.namelist()
def is_zip_file(path):
try:
with zipfile.ZipFile(path, 'r') as _:
return True
except (zipfile.BadZipFile, FileNotFoundError):
return False
class TestDownload(unittest.TestCase): class TestDownload(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
@ -17,17 +30,27 @@ class TestDownload(unittest.TestCase):
constant.CONFIG['cookie'] = os.getenv('NHENTAI_COOKIE') constant.CONFIG['cookie'] = os.getenv('NHENTAI_COOKIE')
constant.CONFIG['useragent'] = os.getenv('NHENTAI_UA') constant.CONFIG['useragent'] = os.getenv('NHENTAI_UA')
self.info = Doujinshi(**doujinshi_parser(did), name_format='%i')
def test_download(self): def test_download(self):
did = 440546 info = self.info
info = Doujinshi(**doujinshi_parser(did), name_format='%i')
info.downloader = Downloader(path='/tmp', threads=5) info.downloader = Downloader(path='/tmp', threads=5)
info.download() info.download()
self.assertTrue(os.path.exists(f'/tmp/{did}/001.jpg')) self.assertTrue(os.path.exists(f'/tmp/{did}/01.jpg'))
generate_html('/tmp', info) generate_html('/tmp', info)
self.assertTrue(os.path.exists(f'/tmp/{did}/index.html')) self.assertTrue(os.path.exists(f'/tmp/{did}/index.html'))
def test_zipfile_download(self):
info = self.info
info.downloader = CompressedDownloader(path='/tmp', threads=5)
info.download()
zipfile_path = f'/tmp/{did}.zip'
self.assertTrue(os.path.exists(zipfile_path))
self.assertTrue(is_zip_file(zipfile_path))
self.assertTrue(has_jepg_file(zipfile_path))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()