Initial commit: doujinshi-dl generic plugin framework

History reset as part of DMCA compliance. The project has been
refactored into a generic, site-agnostic download framework.
Site-specific logic lives in separate plugin packages.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ricter Zheng
2026-03-25 10:37:21 +08:00
commit 81d008036a
53 changed files with 4063 additions and 0 deletions

View File

@@ -0,0 +1 @@
# coding: utf-8

View File

@@ -0,0 +1,16 @@
# coding: utf-8
"""Runtime configuration store for the main package.
Plugins write their paths and settings here so that generic utilities
(e.g. db.py) can read them without hard-coding any plugin name.
"""
_runtime: dict = {}
def set(key: str, value) -> None:
_runtime[key] = value
def get(key: str, default=None):
return _runtime.get(key, default)

View File

@@ -0,0 +1,214 @@
# coding: utf-
import os
import asyncio
import httpx
import urllib3.exceptions
import zipfile
import io
from urllib.parse import urlparse
from doujinshi_dl.core.logger import logger
from doujinshi_dl.core.utils.db import Singleton
from doujinshi_dl.core import config as core_config
async def _async_request(method, url, timeout=30, proxy=None):
"""Minimal async HTTP helper using httpx directly."""
# httpx >=0.28 uses `proxy` (str), older versions used `proxies` (dict)
client_kwargs = {'verify': False}
if proxy:
client_kwargs['proxy'] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
headers = {}
cookie = core_config.get('plugin_config', {}).get('cookie', '')
useragent = core_config.get('plugin_config', {}).get('useragent', '')
if cookie:
headers['Cookie'] = cookie
if useragent:
headers['User-Agent'] = useragent
return await client.request(method, url, timeout=timeout, headers=headers, follow_redirects=True)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def download_callback(result):
result, data = result
if result == 0:
logger.warning('fatal errors occurred, ignored')
elif result == -1:
logger.warning(f'url {data} return status code 404')
elif result == -2:
logger.warning('Ctrl-C pressed, exiting sub processes ...')
elif result == -3:
# workers won't be run, just pass
pass
else:
logger.log(16, f'{data} downloaded successfully')
class Downloader(Singleton):
def __init__(self, path='', threads=5, timeout=30, delay=0, exit_on_fail=False,
no_filename_padding=False):
self.threads = threads
self.path = str(path)
self.timeout = timeout
self.delay = delay
self.exit_on_fail = exit_on_fail
self.folder = None
self.semaphore = None
self.no_filename_padding = no_filename_padding
async def fiber(self, tasks):
self.semaphore = asyncio.Semaphore(self.threads)
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
if result[0] > 0:
logger.info(f'{result[1]} download completed')
else:
raise Exception(f'{result[1]} download failed, return value {result[0]}')
except Exception as e:
logger.error(f'An error occurred: {e}')
if self.exit_on_fail:
raise Exception('User intends to exit on fail')
async def _semaphore_download(self, *args, **kwargs):
async with self.semaphore:
return await self.download(*args, **kwargs)
async def download(self, url, folder='', filename='', retried=0, proxy=None, length=0):
logger.info(f'Starting to download {url} ...')
if self.delay:
await asyncio.sleep(self.delay)
filename = filename if filename else os.path.basename(urlparse(url).path)
base_filename, extension = os.path.splitext(filename)
if not self.no_filename_padding:
filename = base_filename.zfill(length) + extension
else:
filename = base_filename + extension
try:
response = await _async_request('GET', url, timeout=self.timeout, proxy=proxy)
if response.status_code != 200:
path = urlparse(url).path
image_url_mirrors = core_config.get('image_url_mirrors', [])
for mirror in image_url_mirrors:
logger.info(f"Try mirror: {mirror}{path}")
mirror_url = f'{mirror}{path}'
response = await _async_request('GET', mirror_url, timeout=self.timeout, proxy=proxy)
if response.status_code == 200:
break
if not await self.save(filename, response):
logger.error(f'Can not download image {url}')
return -1, url
except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e:
retry_times = core_config.get('retry_times', 3)
if retried < retry_times:
logger.warning(f'Download {filename} failed, retrying({retried + 1}) times...')
return await self.download(
url=url,
folder=folder,
filename=filename,
retried=retried + 1,
proxy=proxy,
)
else:
logger.warning(f'Download {filename} failed with {retry_times} times retried, skipped')
return -2, url
except Exception as e:
import traceback
logger.error(f"Exception type: {type(e)}")
traceback.print_stack()
logger.critical(str(e))
return -9, url
except KeyboardInterrupt:
return -4, url
return 1, url
async def save(self, filename, response) -> bool:
if response is None:
logger.error('Error: Response is None')
return False
save_file_path = os.path.join(self.folder, filename)
with open(save_file_path, 'wb') as f:
if response is not None:
length = response.headers.get('content-length')
if length is None:
f.write(response.content)
else:
async for chunk in response.aiter_bytes(2048):
f.write(chunk)
return True
def create_storage_object(self, folder:str):
if not os.path.exists(folder):
try:
os.makedirs(folder)
except EnvironmentError as e:
logger.critical(str(e))
self.folder:str = folder
self.close = lambda: None # Only available in class CompressedDownloader
def start_download(self, queue, folder='') -> bool:
if not isinstance(folder, (str,)):
folder = str(folder)
if self.path:
folder = os.path.join(self.path, folder)
logger.info(f'Doujinshi will be saved at "{folder}"')
self.create_storage_object(folder)
if os.getenv('DEBUG', None) == 'NODOWNLOAD':
# Assuming we want to continue with rest of process.
return True
digit_length = len(str(len(queue)))
logger.info(f'Total download pages: {len(queue)}')
coroutines = [
self._semaphore_download(url, filename=os.path.basename(urlparse(url).path), length=digit_length)
for url in queue
]
# Prevent coroutines infection
asyncio.run(self.fiber(coroutines))
self.close()
return True
class CompressedDownloader(Downloader):
def create_storage_object(self, folder):
filename = f'{folder}.zip'
print(filename)
self.zipfile = zipfile.ZipFile(filename,'w')
self.close = lambda: self.zipfile.close()
async def save(self, filename, response) -> bool:
if response is None:
logger.error('Error: Response is None')
return False
image_data = io.BytesIO()
length = response.headers.get('content-length')
if length is None:
content = await response.read()
image_data.write(content)
else:
async for chunk in response.aiter_bytes(2048):
image_data.write(chunk)
image_data.seek(0)
self.zipfile.writestr(filename, image_data.read())
return True

179
doujinshi_dl/core/logger.py Normal file
View File

@@ -0,0 +1,179 @@
#
# Copyright (C) 2010-2012 Vinay Sajip. All rights reserved. Licensed under the new BSD license.
#
import logging
import re
import platform
import sys
if platform.system() == 'Windows':
import ctypes
import ctypes.wintypes
# Reference: https://gist.github.com/vsajip/758430
# https://github.com/ipython/ipython/issues/4252
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms686047%28v=vs.85%29.aspx
ctypes.windll.kernel32.SetConsoleTextAttribute.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.WORD]
ctypes.windll.kernel32.SetConsoleTextAttribute.restype = ctypes.wintypes.BOOL
class ColorizingStreamHandler(logging.StreamHandler):
# color names to indices
color_map = {
'black': 0,
'red': 1,
'green': 2,
'yellow': 3,
'blue': 4,
'magenta': 5,
'cyan': 6,
'white': 7,
}
# levels to (background, foreground, bold/intense)
level_map = {
logging.DEBUG: (None, 'blue', False),
logging.INFO: (None, 'white', False),
logging.WARNING: (None, 'yellow', False),
logging.ERROR: (None, 'red', False),
logging.CRITICAL: ('red', 'white', False)
}
csi = '\x1b['
reset = '\x1b[0m'
disable_coloring = False
@property
def is_tty(self):
isatty = getattr(self.stream, 'isatty', None)
return isatty and isatty() and not self.disable_coloring
def emit(self, record):
try:
message = self.format(record)
stream = self.stream
if not self.is_tty:
if message and message[0] == "\r":
message = message[1:]
stream.write(message)
else:
self.output_colorized(message)
stream.write(getattr(self, 'terminator', '\n'))
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except IOError:
pass
except:
self.handleError(record)
if not platform.system() == 'Windows':
def output_colorized(self, message):
self.stream.write(message)
else:
ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m')
nt_color_map = {
0: 0x00, # black
1: 0x04, # red
2: 0x02, # green
3: 0x06, # yellow
4: 0x01, # blue
5: 0x05, # magenta
6: 0x03, # cyan
7: 0x07, # white
}
def output_colorized(self, message):
parts = self.ansi_esc.split(message)
write = self.stream.write
h = None
fd = getattr(self.stream, 'fileno', None)
if fd is not None:
fd = fd()
if fd in (1, 2): # stdout or stderr
h = ctypes.windll.kernel32.GetStdHandle(-10 - fd)
while parts:
text = parts.pop(0)
if text:
if sys.version_info < (3, 0, 0):
write(text.encode('utf-8'))
else:
write(text)
if parts:
params = parts.pop(0)
if h is not None:
params = [int(p) for p in params.split(';')]
color = 0
for p in params:
if 40 <= p <= 47:
color |= self.nt_color_map[p - 40] << 4
elif 30 <= p <= 37:
color |= self.nt_color_map[p - 30]
elif p == 1:
color |= 0x08 # foreground intensity on
elif p == 0: # reset to default color
color = 0x07
else:
pass # error condition ignored
ctypes.windll.kernel32.SetConsoleTextAttribute(h, color)
def colorize(self, message, record):
if record.levelno in self.level_map and self.is_tty:
bg, fg, bold = self.level_map[record.levelno]
params = []
if bg in self.color_map:
params.append(str(self.color_map[bg] + 40))
if fg in self.color_map:
params.append(str(self.color_map[fg] + 30))
if bold:
params.append('1')
if params and message:
if message.lstrip() != message:
prefix = re.search(r"\s+", message).group(0)
message = message[len(prefix):]
else:
prefix = ""
message = "%s%s" % (prefix, ''.join((self.csi, ';'.join(params),
'm', message, self.reset)))
return message
def format(self, record):
message = logging.StreamHandler.format(self, record)
return self.colorize(message, record)
logging.addLevelName(16, "SUCCESS")
logger = logging.getLogger('doujinshi_dl')
LOGGER_HANDLER = ColorizingStreamHandler(sys.stdout)
FORMATTER = logging.Formatter("\r[%(asctime)s] %(funcName)s: %(message)s", "%H:%M:%S")
LOGGER_HANDLER.setFormatter(FORMATTER)
LOGGER_HANDLER.level_map[logging.getLevelName("SUCCESS")] = (None, "green", False)
logger.addHandler(LOGGER_HANDLER)
logger.setLevel(logging.DEBUG)
if __name__ == '__main__':
logger.log(16, 'doujinshi-dl')
logger.info('info')
logger.warning('warning')
logger.debug('debug')
logger.error('error')
logger.critical('critical')

View File

@@ -0,0 +1,77 @@
# coding: utf-8
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Iterator, Tuple
@dataclass
class GalleryMeta:
id: str
name: str
pretty_name: str
img_id: str
ext: list
pages: int
info: Dict[str, Any] = field(default_factory=dict)
extra: Dict[str, Any] = field(default_factory=dict) # plugin-private data
def to_dict(self) -> dict:
d = {
'id': self.id,
'name': self.name,
'pretty_name': self.pretty_name,
'img_id': self.img_id,
'ext': self.ext,
'pages': self.pages,
}
d.update(self.info)
d.update(self.extra)
return d
class BaseParser(ABC):
@abstractmethod
def fetch(self, gallery_id: str) -> GalleryMeta: ...
@abstractmethod
def search(self, keyword: str, sorting: str = 'date', page=None, **kwargs) -> List[Dict]: ...
def favorites(self, page=None) -> List[Dict]:
return []
def configure(self, args): ...
class BaseModel(ABC):
@abstractmethod
def iter_tasks(self) -> Iterator[Tuple[str, str]]: ...
# yields (url, filename) tuples
class BaseSerializer(ABC):
@abstractmethod
def write_all(self, meta: GalleryMeta, output_dir: str): ...
def finalize(self, output_dir: str) -> None:
pass
class BasePlugin(ABC):
name: str
@abstractmethod
def create_parser(self) -> BaseParser: ...
@abstractmethod
def create_model(self, meta: GalleryMeta, name_format: str = '[%i][%a][%t]') -> BaseModel: ...
@abstractmethod
def create_serializer(self) -> BaseSerializer: ...
def register_args(self, argparser): pass
def check_auth(self) -> None:
pass
def print_results(self, results) -> None:
pass

View File

@@ -0,0 +1,28 @@
# coding: utf-8
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from doujinshi_dl.core.plugin import BasePlugin
def get_plugin(name: str) -> 'BasePlugin':
from importlib.metadata import entry_points
eps = entry_points(group='doujinshi_dl.plugins')
for ep in eps:
if ep.name == name:
return ep.load()
raise KeyError(
f"Plugin {name!r} not found. "
f"Install it with: pip install doujinshi-dl-{name}"
)
def get_first_plugin() -> 'BasePlugin':
from importlib.metadata import entry_points
eps = list(entry_points(group='doujinshi_dl.plugins'))
if not eps:
raise RuntimeError(
"No doujinshi-dl plugin installed. "
"Install a plugin from PyPI, e.g.: pip install doujinshi-dl-<name>"
)
return eps[0].load()

View File

@@ -0,0 +1,5 @@
# coding: utf-8
from doujinshi_dl.core.utils.db import Singleton, DB
from doujinshi_dl.core.utils.fs import format_filename, generate_cbz, move_to_folder, parse_doujinshi_obj, EXTENSIONS
from doujinshi_dl.core.utils.html import generate_html, generate_main_html
from doujinshi_dl.core.utils.http import async_request

View File

@@ -0,0 +1,50 @@
# coding: utf-8
"""DB and Singleton utilities."""
import os
import sqlite3
class _Singleton(type):
""" A metaclass that creates a Singleton base class when called. """
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Singleton(_Singleton(str('SingletonMeta'), (object,), {})):
pass
class DB(object):
conn = None
cur = None
def __enter__(self):
from doujinshi_dl.core import config
history_path = config.get(
'history_path',
os.path.expanduser('~/.doujinshi-dl/history.sqlite3'),
)
self.conn = sqlite3.connect(history_path)
self.cur = self.conn.cursor()
self.cur.execute('CREATE TABLE IF NOT EXISTS download_history (id text)')
self.conn.commit()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
def clean_all(self):
self.cur.execute('DELETE FROM download_history WHERE 1')
self.conn.commit()
def add_one(self, data):
self.cur.execute('INSERT INTO download_history VALUES (?)', [data])
self.conn.commit()
def get_all(self):
data = self.cur.execute('SELECT id FROM download_history')
return [i[0] for i in data]

View File

@@ -0,0 +1,98 @@
# coding: utf-8
"""Filesystem utilities: filename formatting, CBZ generation, folder management."""
import os
import zipfile
import shutil
from typing import Tuple
from doujinshi_dl.core.logger import logger
from doujinshi_dl.constant import PATH_SEPARATOR
MAX_FIELD_LENGTH = 100
EXTENSIONS = ('.png', '.jpg', '.jpeg', '.gif', '.webp')
def format_filename(s, length=MAX_FIELD_LENGTH, _truncate_only=False):
"""
It used to be a whitelist approach allowed only alphabet and a part of symbols.
but most doujinshi's names include Japanese 2-byte characters and these was rejected.
so it is using blacklist approach now.
if filename include forbidden characters ('/:,;*?"<>|) ,it replaces space character(" ").
"""
if not _truncate_only:
ban_chars = '\\\'/:,;*?"<>|\t\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b'
filename = s.translate(str.maketrans(ban_chars, ' ' * len(ban_chars))).strip()
filename = ' '.join(filename.split())
while filename.endswith('.'):
filename = filename[:-1]
else:
filename = s
# limit `length` chars
if len(filename) >= length:
filename = filename[:length - 1] + u''
# Remove [] from filename
filename = filename.replace('[]', '').strip()
return filename
def parse_doujinshi_obj(
output_dir: str,
doujinshi_obj=None,
file_type: str = ''
) -> Tuple[str, str]:
filename = f'.{PATH_SEPARATOR}doujinshi.{file_type}'
if doujinshi_obj is not None:
doujinshi_dir = os.path.join(output_dir, doujinshi_obj.filename)
_filename = f'{doujinshi_obj.filename}.{file_type}'
if file_type == 'pdf':
_filename = _filename.replace('/', '-')
filename = os.path.join(output_dir, _filename)
else:
if file_type == 'html':
return output_dir, 'index.html'
doujinshi_dir = f'.{PATH_SEPARATOR}'
if not os.path.exists(doujinshi_dir):
os.makedirs(doujinshi_dir)
return doujinshi_dir, filename
def generate_cbz(doujinshi_dir, filename):
file_list = os.listdir(doujinshi_dir)
file_list.sort()
logger.info(f'Writing CBZ file to path: {filename}')
with zipfile.ZipFile(filename, 'w') as cbz_pf:
for image in file_list:
image_path = os.path.join(doujinshi_dir, image)
cbz_pf.write(image_path, image)
logger.log(16, f'Comic Book CBZ file has been written to "{filename}"')
def move_to_folder(output_dir='.', doujinshi_obj=None, file_type=None):
if not file_type:
raise RuntimeError('no file_type specified')
doujinshi_dir, filename = parse_doujinshi_obj(output_dir, doujinshi_obj, file_type)
for fn in os.listdir(doujinshi_dir):
file_path = os.path.join(doujinshi_dir, fn)
_, ext = os.path.splitext(file_path)
if ext in ['.pdf', '.cbz']:
continue
if os.path.isfile(file_path):
try:
os.remove(file_path)
except Exception as e:
print(f"Error deleting file: {e}")
shutil.move(filename, os.path.join(doujinshi_dir, os.path.basename(filename)))

View File

@@ -0,0 +1,118 @@
# coding: utf-8
"""HTML viewer generation utilities (generic, no site-specific references)."""
import json
import os
import urllib.parse
from doujinshi_dl.core.logger import logger
from doujinshi_dl.core.utils.fs import EXTENSIONS, parse_doujinshi_obj
from doujinshi_dl.constant import PATH_SEPARATOR
def _readfile(path):
loc = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # doujinshi_dl/
with open(os.path.join(loc, path), 'r') as file:
return file.read()
def generate_html(output_dir='.', doujinshi_obj=None, template='default'):
doujinshi_dir, filename = parse_doujinshi_obj(output_dir, doujinshi_obj, 'html')
image_html = ''
if not os.path.exists(doujinshi_dir):
logger.warning(f'Path "{doujinshi_dir}" does not exist, creating.')
try:
os.makedirs(doujinshi_dir)
except EnvironmentError as e:
logger.critical(e)
file_list = os.listdir(doujinshi_dir)
file_list.sort()
for image in file_list:
if not os.path.splitext(image)[1] in EXTENSIONS:
continue
image_html += f'<img src="{image}" class="image-item"/>\n'
html = _readfile(f'viewer/{template}/index.html')
css = _readfile(f'viewer/{template}/styles.css')
js = _readfile(f'viewer/{template}/scripts.js')
if doujinshi_obj is not None:
name = doujinshi_obj.name
else:
metadata_path = os.path.join(doujinshi_dir, "metadata.json")
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as file:
doujinshi_info = json.loads(file.read())
name = doujinshi_info.get("title")
else:
name = 'Doujinshi HTML Viewer'
data = html.format(TITLE=name, IMAGES=image_html, SCRIPTS=js, STYLES=css)
try:
with open(os.path.join(doujinshi_dir, 'index.html'), 'wb') as f:
f.write(data.encode('utf-8'))
logger.log(16, f'HTML Viewer has been written to "{os.path.join(doujinshi_dir, "index.html")}"')
except Exception as e:
logger.warning(f'Writing HTML Viewer failed ({e})')
def generate_main_html(output_dir=f'.{PATH_SEPARATOR}'):
"""
Generate a main html to show all the contained doujinshi.
With a link to their `index.html`.
Default output folder will be the CLI path.
"""
import shutil
image_html = ''
main = _readfile('viewer/main.html')
css = _readfile('viewer/main.css')
js = _readfile('viewer/main.js')
element = '\n\
<div class="gallery-favorite">\n\
<div class="gallery">\n\
<a href="./{FOLDER}/index.html" class="cover" style="padding:0 0 141.6% 0"><img\n\
src="./{FOLDER}/{IMAGE}" />\n\
<div class="caption">{TITLE}</div>\n\
</a>\n\
</div>\n\
</div>\n'
os.chdir(output_dir)
doujinshi_dirs = next(os.walk('.'))[1]
for folder in doujinshi_dirs:
files = os.listdir(folder)
files.sort()
if 'index.html' in files:
logger.info(f'Add doujinshi "{folder}"')
else:
continue
image = files[0] # 001.jpg or 001.png
if folder is not None:
title = folder.replace('_', ' ')
else:
title = 'Doujinshi HTML Viewer'
image_html += element.format(FOLDER=urllib.parse.quote(folder), IMAGE=image, TITLE=title)
if image_html == '':
logger.warning('No index.html found, --gen-main paused.')
return
try:
data = main.format(STYLES=css, SCRIPTS=js, PICTURE=image_html)
with open('./main.html', 'wb') as f:
f.write(data.encode('utf-8'))
pkg_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
shutil.copy(os.path.join(pkg_dir, 'viewer/logo.png'), './')
output_dir = output_dir[:-1] if output_dir.endswith('/') else output_dir
logger.log(16, f'Main Viewer has been written to "{output_dir}/main.html"')
except Exception as e:
logger.warning(f'Writing Main Viewer failed ({e})')

View File

@@ -0,0 +1,34 @@
# coding: utf-8
"""Generic async HTTP request helper (no site-specific headers injected here)."""
import httpx
import urllib3.exceptions
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def async_request(method, url, proxy=None, **kwargs):
"""
Thin async HTTP client wrapper.
Header injection (Cookie, User-Agent, Referer) is done by callers that
have access to site-specific configuration; this helper stays generic.
"""
from doujinshi_dl import constant
headers = kwargs.pop('headers', {})
if proxy is None:
proxy = constant.CONFIG.get('proxy', '')
if isinstance(proxy, str) and not proxy:
proxy = None
# Remove 'timeout' from kwargs to avoid duplicate keyword argument since
# httpx.AsyncClient accepts it as a constructor arg or request arg.
timeout = kwargs.pop('timeout', 30)
async with httpx.AsyncClient(headers=headers, verify=False, proxy=proxy,
timeout=timeout) as client:
response = await client.request(method, url, **kwargs)
return response