"""多章节下载引擎
在 basic.py 的单章节下载基础上,增加:
- 多章节批量下载编排
- SQLite 状态追踪(去重、断点续传)
- 章节范围选择
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kobo_manga.sources.base import BaseSource
from kobo_manga.config import AppConfig
from kobo_manga.db.database import Database
from kobo_manga.db.queries import (
are_all_pages_downloaded,
get_downloaded_chapter_ids,
get_pending_pages,
mark_page_downloaded,
mark_page_failed,
set_chapter_status,
upsert_chapters,
upsert_manga,
upsert_pages,
)
from kobo_manga.downloader.basic import download_chapter as _download_chapter_basic
from kobo_manga.models import (
Chapter,
ChapterResult,
DownloadResult,
MangaInfo,
)
class DownloadEngine:
"""多章节下载编排器,带 SQLite 状态追踪。"""
def __init__(
self,
db: Database,
source: "BaseSource", # 漫画源适配器
config: AppConfig,
base_dir: Path = Path("downloads"),
):
self.db = db
self.source = source
self.config = config
self.base_dir = base_dir
async def download_manga(
self,
manga_id_or_url: str,
chapter_range: tuple[float, float] | None = None,
chapter_ids: list[str] | None = None,
chapter_type: str | None = None,
) -> DownloadResult:
"""下载漫画的多个章节。
Args:
manga_id_or_url: 漫画 URL 或 ID
chapter_range: (起始章节号, 结束章节号),闭区间
chapter_ids: 指定章节 ID 列表(与 chapter_range 二选一)
chapter_type: 章节类型筛选 (volume/chapter/extra)
Returns:
DownloadResult 汇总结果
"""
# 1. 获取漫画信息和章节列表
print(f"获取漫画信息: {manga_id_or_url}")
manga = await self.source.get_manga_info(manga_id_or_url)
print(f" {manga.title} - 共 {len(manga.chapters)} 个章节")
# 2. 持久化到数据库
upsert_manga(self.db, manga)
upsert_chapters(self.db, manga.id, manga.source, manga.chapters)
# 3. 去重:获取已下载的章节
done_ids = get_downloaded_chapter_ids(self.db, manga.id, manga.source)
# 4. 筛选要下载的章节
selected = self._select_chapters(
manga.chapters, chapter_range, chapter_ids, chapter_type
)
to_download = [ch for ch in selected if ch.id not in done_ids]
skipped = len(selected) - len(to_download)
print(
f" 选中 {len(selected)} 个章节,"
f"跳过 {skipped} 个已下载,"
f"待下载 {len(to_download)} 个"
)
# 5. 逐章下载
result = DownloadResult(
manga=manga,
chapters_total=len(selected),
chapters_skipped=skipped,
)
for i, chapter in enumerate(to_download, 1):
print(f"\n[{i}/{len(to_download)}] {chapter.title}")
ch_result = await self.download_chapter(manga, chapter)
result.chapter_results.append(ch_result)
if ch_result.status == "downloaded":
result.chapters_downloaded += 1
else:
result.chapters_failed += 1
# 汇总
print(f"\n{'='*50}")
print(
f"完成: {result.chapters_downloaded} 下载 / "
f"{result.chapters_skipped} 跳过 / "
f"{result.chapters_failed} 失败"
)
return result
async def download_chapter(
self, manga: MangaInfo, chapter: Chapter
) -> ChapterResult:
"""下载单个章节,带状态追踪。"""
source_name = manga.source
# 标记为下载中
set_chapter_status(self.db, chapter.id, source_name, "downloading")
try:
# 获取图片列表
images = await self.source.get_chapter_images(chapter)
print(f" 共 {len(images)} 页")
# 记录页面到数据库
upsert_pages(self.db, chapter.id, source_name, images)
# 断点续传:只下载未完成的页
pending = get_pending_pages(self.db, chapter.id, source_name)
total_pages = len(images)
if not pending:
# 所有页面已下载
output_dir = self._chapter_dir(manga, chapter)
set_chapter_status(
self.db, chapter.id, source_name,
"downloaded", str(output_dir),
)
print(f" 所有页面已存在,跳过")
return ChapterResult(
chapter=chapter,
status="downloaded",
pages_total=total_pages,
pages_downloaded=total_pages,
download_path=output_dir,
)
already_done = total_pages - len(pending)
if already_done > 0:
print(f" 续传: {already_done} 页已存在,下载剩余 {len(pending)} 页")
# 调用基础下载器
output_dir = self._chapter_dir(manga, chapter)
downloaded = await _download_chapter_basic(
pending, output_dir, chapter.url, self.config
)
# 更新页面状态
downloaded_nums = set()
for page in downloaded:
if page.local_path:
mark_page_downloaded(
self.db, chapter.id, source_name,
page.page_number, page.local_path,
)
downloaded_nums.add(page.page_number)
# 标记失败的页面
for page in pending:
if page.page_number not in downloaded_nums:
mark_page_failed(
self.db, chapter.id, source_name, page.page_number
)
# 检查是否全部完成
all_done = are_all_pages_downloaded(
self.db, chapter.id, source_name
)
pages_downloaded = already_done + len(downloaded_nums)
if all_done:
set_chapter_status(
self.db, chapter.id, source_name,
"downloaded", str(output_dir),
)
status = "downloaded"
print(f" 完成: {pages_downloaded}/{total_pages} 页")
else:
set_chapter_status(
self.db, chapter.id, source_name, "failed"
)
status = "partial"
print(
f" 部分失败: {pages_downloaded}/{total_pages} 页"
)
return ChapterResult(
chapter=chapter,
status=status,
pages_total=total_pages,
pages_downloaded=pages_downloaded,
download_path=output_dir,
)
except Exception as e:
set_chapter_status(
self.db, chapter.id, source_name, "failed"
)
print(f" 下载失败: {e}")
return ChapterResult(
chapter=chapter,
status="failed",
pages_total=0,
pages_downloaded=0,
)
async def resume_incomplete(self, manga_id_or_url: str) -> DownloadResult:
"""恢复未完成的下载。重新运行 download_manga 即可,去重逻辑会自动跳过已完成的。"""
return await self.download_manga(manga_id_or_url)
def _select_chapters(
self,
chapters: list[Chapter],
chapter_range: tuple[float, float] | None,
chapter_ids: list[str] | None,
chapter_type: str | None = None,
) -> list[Chapter]:
"""按范围、ID 或类型筛选章节。"""
# 先按类型过滤
if chapter_type is not None:
chapters = [ch for ch in chapters if ch.chapter_type == chapter_type]
if chapter_ids is not None:
id_set = set(chapter_ids)
return [ch for ch in chapters if ch.id in id_set]
if chapter_range is not None:
start, end = chapter_range
return [
ch for ch in chapters
if start <= ch.chapter_number <= end
]
# 未指定范围则选择全部(已按类型过滤)
return list(chapters)
def _chapter_dir(self, manga: MangaInfo, chapter: Chapter) -> Path:
"""计算章节下载目录。"""
safe_manga = _sanitize_filename(manga.title)
safe_chapter = _sanitize_filename(chapter.title)
return self.base_dir / safe_manga / safe_chapter
def _sanitize_filename(name: str) -> str:
"""清理文件名中的非法字符。"""
return "".join(
c if c.isalnum() or c in " _-()()【】" else "_" for c in name
)