"""多章节下载引擎 在 basic.py 的单章节下载基础上,增加: - 多章节批量下载编排 - SQLite 状态追踪(去重、断点续传) - 章节范围选择 """ from __future__ import annotations from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from kobo_manga.sources.base import BaseSource from kobo_manga.config import AppConfig from kobo_manga.db.database import Database from kobo_manga.db.queries import ( are_all_pages_downloaded, get_downloaded_chapter_ids, get_pending_pages, mark_page_downloaded, mark_page_failed, set_chapter_status, upsert_chapters, upsert_manga, upsert_pages, ) from kobo_manga.downloader.basic import download_chapter as _download_chapter_basic from kobo_manga.models import ( Chapter, ChapterResult, DownloadResult, MangaInfo, ) class DownloadEngine: """多章节下载编排器,带 SQLite 状态追踪。""" def __init__( self, db: Database, source: "BaseSource", # 漫画源适配器 config: AppConfig, base_dir: Path = Path("downloads"), ): self.db = db self.source = source self.config = config self.base_dir = base_dir async def download_manga( self, manga_id_or_url: str, chapter_range: tuple[float, float] | None = None, chapter_ids: list[str] | None = None, chapter_type: str | None = None, ) -> DownloadResult: """下载漫画的多个章节。 Args: manga_id_or_url: 漫画 URL 或 ID chapter_range: (起始章节号, 结束章节号),闭区间 chapter_ids: 指定章节 ID 列表(与 chapter_range 二选一) chapter_type: 章节类型筛选 (volume/chapter/extra) Returns: DownloadResult 汇总结果 """ # 1. 获取漫画信息和章节列表 print(f"获取漫画信息: {manga_id_or_url}") manga = await self.source.get_manga_info(manga_id_or_url) print(f" {manga.title} - 共 {len(manga.chapters)} 个章节") # 2. 持久化到数据库 upsert_manga(self.db, manga) upsert_chapters(self.db, manga.id, manga.source, manga.chapters) # 3. 去重:获取已下载的章节 done_ids = get_downloaded_chapter_ids(self.db, manga.id, manga.source) # 4. 筛选要下载的章节 selected = self._select_chapters( manga.chapters, chapter_range, chapter_ids, chapter_type ) to_download = [ch for ch in selected if ch.id not in done_ids] skipped = len(selected) - len(to_download) print( f" 选中 {len(selected)} 个章节," f"跳过 {skipped} 个已下载," f"待下载 {len(to_download)} 个" ) # 5. 逐章下载 result = DownloadResult( manga=manga, chapters_total=len(selected), chapters_skipped=skipped, ) for i, chapter in enumerate(to_download, 1): print(f"\n[{i}/{len(to_download)}] {chapter.title}") ch_result = await self.download_chapter(manga, chapter) result.chapter_results.append(ch_result) if ch_result.status == "downloaded": result.chapters_downloaded += 1 else: result.chapters_failed += 1 # 汇总 print(f"\n{'='*50}") print( f"完成: {result.chapters_downloaded} 下载 / " f"{result.chapters_skipped} 跳过 / " f"{result.chapters_failed} 失败" ) return result async def download_chapter( self, manga: MangaInfo, chapter: Chapter ) -> ChapterResult: """下载单个章节,带状态追踪。""" source_name = manga.source # 标记为下载中 set_chapter_status(self.db, chapter.id, source_name, "downloading") try: # 获取图片列表 images = await self.source.get_chapter_images(chapter) print(f" 共 {len(images)} 页") # 记录页面到数据库 upsert_pages(self.db, chapter.id, source_name, images) # 断点续传:只下载未完成的页 pending = get_pending_pages(self.db, chapter.id, source_name) total_pages = len(images) if not pending: # 所有页面已下载 output_dir = self._chapter_dir(manga, chapter) set_chapter_status( self.db, chapter.id, source_name, "downloaded", str(output_dir), ) print(f" 所有页面已存在,跳过") return ChapterResult( chapter=chapter, status="downloaded", pages_total=total_pages, pages_downloaded=total_pages, download_path=output_dir, ) already_done = total_pages - len(pending) if already_done > 0: print(f" 续传: {already_done} 页已存在,下载剩余 {len(pending)} 页") # 调用基础下载器 output_dir = self._chapter_dir(manga, chapter) downloaded = await _download_chapter_basic( pending, output_dir, chapter.url, self.config ) # 更新页面状态 downloaded_nums = set() for page in downloaded: if page.local_path: mark_page_downloaded( self.db, chapter.id, source_name, page.page_number, page.local_path, ) downloaded_nums.add(page.page_number) # 标记失败的页面 for page in pending: if page.page_number not in downloaded_nums: mark_page_failed( self.db, chapter.id, source_name, page.page_number ) # 检查是否全部完成 all_done = are_all_pages_downloaded( self.db, chapter.id, source_name ) pages_downloaded = already_done + len(downloaded_nums) if all_done: set_chapter_status( self.db, chapter.id, source_name, "downloaded", str(output_dir), ) status = "downloaded" print(f" 完成: {pages_downloaded}/{total_pages} 页") else: set_chapter_status( self.db, chapter.id, source_name, "failed" ) status = "partial" print( f" 部分失败: {pages_downloaded}/{total_pages} 页" ) return ChapterResult( chapter=chapter, status=status, pages_total=total_pages, pages_downloaded=pages_downloaded, download_path=output_dir, ) except Exception as e: set_chapter_status( self.db, chapter.id, source_name, "failed" ) print(f" 下载失败: {e}") return ChapterResult( chapter=chapter, status="failed", pages_total=0, pages_downloaded=0, ) async def resume_incomplete(self, manga_id_or_url: str) -> DownloadResult: """恢复未完成的下载。重新运行 download_manga 即可,去重逻辑会自动跳过已完成的。""" return await self.download_manga(manga_id_or_url) def _select_chapters( self, chapters: list[Chapter], chapter_range: tuple[float, float] | None, chapter_ids: list[str] | None, chapter_type: str | None = None, ) -> list[Chapter]: """按范围、ID 或类型筛选章节。""" # 先按类型过滤 if chapter_type is not None: chapters = [ch for ch in chapters if ch.chapter_type == chapter_type] if chapter_ids is not None: id_set = set(chapter_ids) return [ch for ch in chapters if ch.id in id_set] if chapter_range is not None: start, end = chapter_range return [ ch for ch in chapters if start <= ch.chapter_number <= end ] # 未指定范围则选择全部(已按类型过滤) return list(chapters) def _chapter_dir(self, manga: MangaInfo, chapter: Chapter) -> Path: """计算章节下载目录。""" safe_manga = _sanitize_filename(manga.title) safe_chapter = _sanitize_filename(chapter.title) return self.base_dir / safe_manga / safe_chapter def _sanitize_filename(name: str) -> str: """清理文件名中的非法字符。""" return "".join( c if c.isalnum() or c in " _-()()【】" else "_" for c in name )