"""全流程编排器
串联 搜索 → 下载 → 图片处理 → KEPUB 打包 的完整流水线。
"""
from pathlib import Path
from kobo_manga.config import AppConfig
from kobo_manga.converter.kepub import KepubBuilder
from kobo_manga.db.database import Database
from kobo_manga.db.queries import (
get_downloaded_chapter_ids,
get_manga,
get_manga_chapter_stats,
update_subscription_checked,
upsert_chapters,
upsert_manga,
)
from kobo_manga.downloader.engine import DownloadEngine
from kobo_manga.models import Chapter, DownloadResult, MangaInfo
from kobo_manga.processor.pipeline import ImageProcessor
from kobo_manga.sources import get_source
class MangaPipeline:
"""全流程编排:搜索 → 下载 → 处理 → 打包。"""
def __init__(
self,
config: AppConfig,
db: Database,
base_dir: Path = Path("."),
):
self.config = config
self.db = db
self.downloads_dir = base_dir / "downloads"
self.output_dir = base_dir / "output"
def _source_name(self) -> str:
"""获取当前配置的源名称。"""
return self.config.sources[0] if self.config.sources else "manhuagui"
async def search(
self, keyword: str, source_name: str | None = None
) -> list[MangaInfo]:
"""搜索漫画。source_name=None 时搜索所有已配置的源。"""
if source_name:
async with get_source(source_name) as source:
return await source.search(keyword)
# 多源聚合搜索
results: list[MangaInfo] = []
for name in self.config.sources or ["manhuagui"]:
try:
async with get_source(name) as source:
results.extend(await source.search(keyword))
except Exception as e:
print(f" [!] {name} 搜索失败: {e}")
return results
async def get_manga_info(
self, manga_url: str, source_name: str | None = None
) -> MangaInfo:
"""获取漫画详情(含章节列表)。"""
name = source_name or self._source_name()
async with get_source(name) as source:
manga = await source.get_manga_info(manga_url)
upsert_manga(self.db, manga)
return manga
async def download_and_convert(
self,
manga_url: str,
source_name: str | None = None,
chapter_range: tuple[float, float] | None = None,
chapter_ids: list[str] | None = None,
chapter_type: str | None = None,
) -> list[Path]:
"""全流程:下载 → 图片处理 → KEPUB 打包。
Args:
manga_url: 漫画 URL
source_name: 源名称,None 则用默认源
chapter_range: 章节号范围 (start, end),闭区间
chapter_ids: 指定章节 ID 列表
chapter_type: 章节类型筛选 (volume/chapter/extra)
Returns:
生成的 .kepub.epub 文件路径列表
"""
name = source_name or self._source_name()
async with get_source(name) as source:
engine = DownloadEngine(
db=self.db,
source=source,
config=self.config,
base_dir=self.downloads_dir,
)
result = await engine.download_manga(
manga_url,
chapter_range=chapter_range,
chapter_ids=chapter_ids,
chapter_type=chapter_type,
)
return self._process_results(result)
def _process_results(self, result: DownloadResult) -> list[Path]:
"""对下载成功的章节执行图片处理和 KEPUB 打包。"""
kepub_paths = []
processor = ImageProcessor(
self.config.processing, self.config.device
)
successful = [
cr for cr in result.chapter_results
if cr.status == "downloaded" and cr.download_path
]
if not successful:
return kepub_paths
manga_output = self.output_dir / _sanitize_filename(result.manga.title)
for i, cr in enumerate(successful, 1):
# 检查是否已有 KEPUB(用章节 ID 避免同名冲突)
kepub_name = (
f"{_sanitize_filename(result.manga.title)} - "
f"{cr.chapter.id} {_sanitize_filename(cr.chapter.title)}.kepub.epub"
)
kepub_path = manga_output / kepub_name
if kepub_path.exists():
print(f" [SKIP] 已存在: {cr.chapter.title}")
kepub_paths.append(kepub_path)
continue
print(
f"\n[{i}/{len(successful)}] "
f"处理+打包: {cr.chapter.title}"
)
try:
path = self._process_and_convert(
result.manga, cr.chapter, cr.download_path
)
kepub_paths.append(path)
print(f" [OK] {path.name}")
except Exception as e:
print(f" [!] 处理失败: {e}")
return kepub_paths
def _process_and_convert(
self,
manga: MangaInfo,
chapter: Chapter,
download_path: Path,
) -> Path:
"""处理图片 + 打包 KEPUB(单章节)。"""
processor = ImageProcessor(
self.config.processing, self.config.device
)
# 收集下载的图片文件
image_paths = sorted(
p for p in download_path.iterdir()
if p.suffix.lower() in (".jpg", ".jpeg", ".png", ".webp")
)
if not image_paths:
raise ValueError(f"目录中无图片文件: {download_path}")
# 图片处理
processed_dir = (
self.downloads_dir.parent
/ "processed"
/ _sanitize_filename(manga.title)
/ _sanitize_filename(chapter.title)
)
processed_paths = processor.process_chapter(image_paths, processed_dir)
print(f" 处理: {len(image_paths)} -> {len(processed_paths)} 张")
# KEPUB 打包
output_dir = self.output_dir / _sanitize_filename(manga.title)
builder = KepubBuilder(manga, chapter, self.config.device)
kepub_path = builder.build(processed_paths, output_dir)
print(f" 打包: {kepub_path.name}")
return kepub_path
async def check_and_download_updates(
self, manga_id: str, source_name: str
) -> list[Path]:
"""检查订阅漫画的新章节并下载转换。
Returns:
新生成的 .kepub.epub 路径列表
"""
# 1. 从 DB 获取漫画记录
manga = get_manga(self.db, manga_id, source_name)
if not manga:
print(f" [!] 漫画不存在: {manga_id} ({source_name})")
return []
# 2. 拉取最新章节列表
async with get_source(source_name) as source:
fresh_manga = await source.get_manga_info(manga.url)
# 3. 更新 DB
upsert_manga(self.db, fresh_manga)
upsert_chapters(
self.db, fresh_manga.id, fresh_manga.source, fresh_manga.chapters
)
# 4. 找出新章节(未下载的)
downloaded_ids = get_downloaded_chapter_ids(
self.db, fresh_manga.id, fresh_manga.source
)
new_chapters = [
ch for ch in fresh_manga.chapters
if ch.id not in downloaded_ids
]
if not new_chapters:
print(f" [OK] {fresh_manga.title}: 无新章节")
update_subscription_checked(self.db, manga_id, source_name)
return []
print(f" 发现 {len(new_chapters)} 个新章节")
new_chapter_ids = [ch.id for ch in new_chapters]
# 5. 下载新章节
async with get_source(source_name) as source:
engine = DownloadEngine(
db=self.db,
source=source,
config=self.config,
base_dir=self.downloads_dir,
)
result = await engine.download_manga(
fresh_manga.url,
chapter_ids=new_chapter_ids,
)
# 6. 处理+打包
kepub_paths = self._process_results(result)
# 7. 更新订阅状态
last_ch_id = new_chapters[-1].id if new_chapters else None
update_subscription_checked(
self.db, manga_id, source_name, last_chapter_id=last_ch_id
)
return kepub_paths
def get_stats(self, manga: MangaInfo) -> dict:
"""获取漫画的章节统计。"""
return get_manga_chapter_stats(self.db, manga.id, manga.source)
def _sanitize_filename(name: str) -> str:
"""清理文件名中的非法字符。"""
return "".join(
c if c.isalnum() or c in " _-()()【】" else "_" for c in name
)