"""全流程编排器 串联 搜索 → 下载 → 图片处理 → KEPUB 打包 的完整流水线。 """ from pathlib import Path from kobo_manga.config import AppConfig from kobo_manga.converter.kepub import KepubBuilder from kobo_manga.db.database import Database from kobo_manga.db.queries import ( get_downloaded_chapter_ids, get_manga, get_manga_chapter_stats, update_subscription_checked, upsert_chapters, upsert_manga, ) from kobo_manga.downloader.engine import DownloadEngine from kobo_manga.models import Chapter, DownloadResult, MangaInfo from kobo_manga.processor.pipeline import ImageProcessor from kobo_manga.sources import get_source class MangaPipeline: """全流程编排:搜索 → 下载 → 处理 → 打包。""" def __init__( self, config: AppConfig, db: Database, base_dir: Path = Path("."), ): self.config = config self.db = db self.downloads_dir = base_dir / "downloads" self.output_dir = base_dir / "output" def _source_name(self) -> str: """获取当前配置的源名称。""" return self.config.sources[0] if self.config.sources else "manhuagui" async def search( self, keyword: str, source_name: str | None = None ) -> list[MangaInfo]: """搜索漫画。source_name=None 时搜索所有已配置的源。""" if source_name: async with get_source(source_name) as source: return await source.search(keyword) # 多源聚合搜索 results: list[MangaInfo] = [] for name in self.config.sources or ["manhuagui"]: try: async with get_source(name) as source: results.extend(await source.search(keyword)) except Exception as e: print(f" [!] {name} 搜索失败: {e}") return results async def get_manga_info( self, manga_url: str, source_name: str | None = None ) -> MangaInfo: """获取漫画详情(含章节列表)。""" name = source_name or self._source_name() async with get_source(name) as source: manga = await source.get_manga_info(manga_url) upsert_manga(self.db, manga) return manga async def download_and_convert( self, manga_url: str, source_name: str | None = None, chapter_range: tuple[float, float] | None = None, chapter_ids: list[str] | None = None, chapter_type: str | None = None, ) -> list[Path]: """全流程:下载 → 图片处理 → KEPUB 打包。 Args: manga_url: 漫画 URL source_name: 源名称,None 则用默认源 chapter_range: 章节号范围 (start, end),闭区间 chapter_ids: 指定章节 ID 列表 chapter_type: 章节类型筛选 (volume/chapter/extra) Returns: 生成的 .kepub.epub 文件路径列表 """ name = source_name or self._source_name() async with get_source(name) as source: engine = DownloadEngine( db=self.db, source=source, config=self.config, base_dir=self.downloads_dir, ) result = await engine.download_manga( manga_url, chapter_range=chapter_range, chapter_ids=chapter_ids, chapter_type=chapter_type, ) return self._process_results(result) def _process_results(self, result: DownloadResult) -> list[Path]: """对下载成功的章节执行图片处理和 KEPUB 打包。""" kepub_paths = [] processor = ImageProcessor( self.config.processing, self.config.device ) successful = [ cr for cr in result.chapter_results if cr.status == "downloaded" and cr.download_path ] if not successful: return kepub_paths manga_output = self.output_dir / _sanitize_filename(result.manga.title) for i, cr in enumerate(successful, 1): # 检查是否已有 KEPUB(用章节 ID 避免同名冲突) kepub_name = ( f"{_sanitize_filename(result.manga.title)} - " f"{cr.chapter.id} {_sanitize_filename(cr.chapter.title)}.kepub.epub" ) kepub_path = manga_output / kepub_name if kepub_path.exists(): print(f" [SKIP] 已存在: {cr.chapter.title}") kepub_paths.append(kepub_path) continue print( f"\n[{i}/{len(successful)}] " f"处理+打包: {cr.chapter.title}" ) try: path = self._process_and_convert( result.manga, cr.chapter, cr.download_path ) kepub_paths.append(path) print(f" [OK] {path.name}") except Exception as e: print(f" [!] 处理失败: {e}") return kepub_paths def _process_and_convert( self, manga: MangaInfo, chapter: Chapter, download_path: Path, ) -> Path: """处理图片 + 打包 KEPUB(单章节)。""" processor = ImageProcessor( self.config.processing, self.config.device ) # 收集下载的图片文件 image_paths = sorted( p for p in download_path.iterdir() if p.suffix.lower() in (".jpg", ".jpeg", ".png", ".webp") ) if not image_paths: raise ValueError(f"目录中无图片文件: {download_path}") # 图片处理 processed_dir = ( self.downloads_dir.parent / "processed" / _sanitize_filename(manga.title) / _sanitize_filename(chapter.title) ) processed_paths = processor.process_chapter(image_paths, processed_dir) print(f" 处理: {len(image_paths)} -> {len(processed_paths)} 张") # KEPUB 打包 output_dir = self.output_dir / _sanitize_filename(manga.title) builder = KepubBuilder(manga, chapter, self.config.device) kepub_path = builder.build(processed_paths, output_dir) print(f" 打包: {kepub_path.name}") return kepub_path async def check_and_download_updates( self, manga_id: str, source_name: str ) -> list[Path]: """检查订阅漫画的新章节并下载转换。 Returns: 新生成的 .kepub.epub 路径列表 """ # 1. 从 DB 获取漫画记录 manga = get_manga(self.db, manga_id, source_name) if not manga: print(f" [!] 漫画不存在: {manga_id} ({source_name})") return [] # 2. 拉取最新章节列表 async with get_source(source_name) as source: fresh_manga = await source.get_manga_info(manga.url) # 3. 更新 DB upsert_manga(self.db, fresh_manga) upsert_chapters( self.db, fresh_manga.id, fresh_manga.source, fresh_manga.chapters ) # 4. 找出新章节(未下载的) downloaded_ids = get_downloaded_chapter_ids( self.db, fresh_manga.id, fresh_manga.source ) new_chapters = [ ch for ch in fresh_manga.chapters if ch.id not in downloaded_ids ] if not new_chapters: print(f" [OK] {fresh_manga.title}: 无新章节") update_subscription_checked(self.db, manga_id, source_name) return [] print(f" 发现 {len(new_chapters)} 个新章节") new_chapter_ids = [ch.id for ch in new_chapters] # 5. 下载新章节 async with get_source(source_name) as source: engine = DownloadEngine( db=self.db, source=source, config=self.config, base_dir=self.downloads_dir, ) result = await engine.download_manga( fresh_manga.url, chapter_ids=new_chapter_ids, ) # 6. 处理+打包 kepub_paths = self._process_results(result) # 7. 更新订阅状态 last_ch_id = new_chapters[-1].id if new_chapters else None update_subscription_checked( self.db, manga_id, source_name, last_chapter_id=last_ch_id ) return kepub_paths def get_stats(self, manga: MangaInfo) -> dict: """获取漫画的章节统计。""" return get_manga_chapter_stats(self.db, manga.id, manga.source) def _sanitize_filename(name: str) -> str: """清理文件名中的非法字符。""" return "".join( c if c.isalnum() or c in " _-()()【】" else "_" for c in name )