"""CLI 命令定义""" import asyncio import sys from pathlib import Path from typing import Optional import typer from kobo_manga.config import load_config from kobo_manga.db.database import Database from kobo_manga.db.queries import ( find_manga_by_title, get_manga_chapter_stats, list_all_manga, list_subscriptions, subscribe_manga, unsubscribe_manga, ) from kobo_manga.models import MangaInfo from kobo_manga.pipeline import MangaPipeline from kobo_manga.sources import infer_source_from_url from kobo_manga.transfer import get_transfer # Windows GBK 兼容 sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") app = typer.Typer( name="kobo-manga", help="Manga download-convert-transfer pipeline for Kobo e-reader", no_args_is_help=True, ) def _run(coro): """运行异步协程。""" return asyncio.run(coro) def _parse_chapter_range(chapters: str) -> tuple[float, float]: """解析章节范围字符串。 支持格式: "1-10", "5", "0-2.5" """ if "-" in chapters: parts = chapters.split("-", 1) return float(parts[0]), float(parts[1]) num = float(chapters) return num, num def _interactive_select(results: list[MangaInfo]) -> MangaInfo | None: """交互式选择漫画。""" if not results: print("未找到结果") return None print(f"\n找到 {len(results)} 个结果:\n") for i, manga in enumerate(results, 1): author = f" [{manga.author}]" if manga.author else "" source_label = f" ({manga.source})" if manga.source else "" print(f" {i}. {manga.title}{author}{source_label}") if manga.description: desc = manga.description[:60] if len(manga.description) > 60: desc += "..." print(f" {desc}") print() while True: choice = input("选择 (序号, q 退出): ").strip() if choice.lower() == "q": return None try: idx = int(choice) - 1 if 0 <= idx < len(results): return results[idx] except ValueError: pass print("无效输入,请重试") def _sanitize_filename(name: str) -> str: """清理文件名中的非法字符。""" return "".join( c if c.isalnum() or c in " _-()()【】" else "_" for c in name ) # ── 命令 ───────────────────────────────────────────────── @app.command() def search( keyword: str = typer.Argument(help="搜索关键词"), source: Optional[str] = typer.Option( None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)" ), ): """搜索漫画。""" config = load_config() with Database() as db: pipeline = MangaPipeline(config, db) results = _run(pipeline.search(keyword, source_name=source)) selected = _interactive_select(results) if selected is None: raise typer.Exit() # 获取详情 with Database() as db: pipeline = MangaPipeline(config, db) manga = _run(pipeline.get_manga_info( selected.url, source_name=selected.source )) print(f"\n{'='*50}") print(f"标题: {manga.title}") if manga.author: print(f"作者: {manga.author}") if manga.tags: print(f"标签: {', '.join(manga.tags)}") print(f"章节: {len(manga.chapters)} 个") if manga.chapters: first = manga.chapters[0] last = manga.chapters[-1] print(f"范围: {first.title} ~ {last.title}") print(f"URL: {manga.url}") @app.command() def download( target: str = typer.Argument(help="漫画名或 URL"), chapters: Optional[str] = typer.Option( None, "--chapters", "-c", help="章节范围 (如 1-10, 5)" ), source: Optional[str] = typer.Option( None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)" ), chapter_type: Optional[str] = typer.Option( None, "--type", "-t", help="章节类型筛选 (volume/chapter/extra)" ), push: bool = typer.Option( False, "--push", "-p", help="下载后推送到设备" ), ): """下载漫画并转换为 KEPUB。""" config = load_config() chapter_range = _parse_chapter_range(chapters) if chapters else None with Database() as db: pipeline = MangaPipeline(config, db) # 判断是 URL 还是名字 if target.startswith("http"): manga_url = target src = source or infer_source_from_url(target) else: # 搜索并选择 results = _run(pipeline.search(target, source_name=source)) selected = _interactive_select(results) if selected is None: raise typer.Exit() manga_url = selected.url src = selected.source # 全流程: 下载 → 处理 → KEPUB kepub_paths = _run( pipeline.download_and_convert( manga_url, source_name=src, chapter_range=chapter_range, chapter_type=chapter_type, ) ) if not kepub_paths: print("\n没有新的 KEPUB 文件生成") raise typer.Exit() print(f"\n生成 {len(kepub_paths)} 个 KEPUB 文件:") for p in kepub_paths: size_mb = p.stat().st_size / 1024 / 1024 print(f" {p.name} ({size_mb:.1f}MB)") # 推送到设备 if push: _do_push(config, kepub_paths) @app.command(name="list") def list_manga(): """查看本地漫画库。""" with Database() as db: mangas = list_all_manga(db) if not mangas: print("本地库为空,使用 download 命令下载漫画") raise typer.Exit() print(f"\n本地漫画库 ({len(mangas)} 部):\n") for manga in mangas: stats = get_manga_chapter_stats(db, manga.id, manga.source) author = f" [{manga.author}]" if manga.author else "" downloaded = stats["downloaded"] total = stats["total"] print(f" {manga.title}{author}") print(f" 章节: {downloaded}/{total} 已下载 来源: {manga.source}") @app.command() def push( target: str = typer.Argument(help="漫画名"), ): """推送 KEPUB 到设备。""" config = load_config() # 查找 output 目录下的 KEPUB 文件 output_dir = Path("output") / _sanitize_filename(target) if not output_dir.exists(): # 尝试模糊匹配 output 下的目录 output_base = Path("output") if output_base.exists(): matches = [ d for d in output_base.iterdir() if d.is_dir() and target.lower() in d.name.lower() ] if len(matches) == 1: output_dir = matches[0] elif len(matches) > 1: print("匹配到多个目录:") for d in matches: print(f" {d.name}") print("请使用更精确的名称") raise typer.Exit(1) if not output_dir.exists(): print(f"未找到 KEPUB 文件目录: {output_dir}") print("请先使用 download 命令下载并转换") raise typer.Exit(1) kepub_paths = sorted(output_dir.glob("*.kepub.epub")) if not kepub_paths: print(f"目录中无 .kepub.epub 文件: {output_dir}") raise typer.Exit(1) print(f"找到 {len(kepub_paths)} 个 KEPUB 文件") _do_push(config, kepub_paths, manga_title=output_dir.name) @app.command() def config(): """查看当前配置。""" cfg = load_config() print("\n当前配置:\n") print(f"[设备]") print(f" 型号: {cfg.device.model}") print(f" 分辨率: {cfg.device.width}x{cfg.device.height}") print(f" 彩色: {'是' if cfg.device.color else '否'}") print() print(f"[漫画源]") print(f" 启用: {', '.join(cfg.sources)}") print() print(f"[图片处理]") print(f" 双页拆分: {'开' if cfg.processing.split_double_page else '关'}") print(f" 裁白边: {'开' if cfg.processing.crop_whitespace else '关'}") print(f" 缩放: {'开' if cfg.processing.resize else '关'}") print(f" 灰度: {'开' if cfg.processing.grayscale else '关'}") print(f" 对比度增强: {'开' if cfg.processing.enhance_contrast else '关'}") print(f" 对比度系数: {cfg.processing.contrast_factor}") print() print(f"[下载]") print(f" 并发数: {cfg.download.concurrent}") print(f" 重试次数: {cfg.download.retry}") print(f" 延迟: {cfg.download.delay}s") print() print(f"[传输]") print(f" 方式: {cfg.transfer.method}") if cfg.transfer.method == "calibre": print(f" Calibre: {cfg.transfer.calibre_host}:{cfg.transfer.calibre_port}") @app.command() def subscribe( target: str = typer.Argument(help="漫画名或 URL"), source: Optional[str] = typer.Option( None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)" ), auto_push: bool = typer.Option( False, "--auto-push", help="新章节自动推送到设备" ), ): """订阅漫画追更。""" config = load_config() with Database() as db: pipeline = MangaPipeline(config, db) # 获取漫画信息 if target.startswith("http"): src = source or infer_source_from_url(target) manga = _run(pipeline.get_manga_info(target, source_name=src)) else: results = _run(pipeline.search(target, source_name=source)) selected = _interactive_select(results) if selected is None: raise typer.Exit() manga = _run(pipeline.get_manga_info( selected.url, source_name=selected.source )) # 订阅 subscribe_manga(db, manga.id, manga.source, auto_push) print(f"\n[OK] 已订阅: {manga.title}") print(f" 来源: {manga.source}") print(f" 章节数: {len(manga.chapters)}") if auto_push: print(" 自动推送: 开") @app.command() def unsubscribe( target: str = typer.Argument(help="漫画名"), ): """取消订阅。""" with Database() as db: # 在本地库搜索 mangas = find_manga_by_title(db, target) if not mangas: print(f"未找到匹配的漫画: {target}") raise typer.Exit(1) if len(mangas) == 1: manga = mangas[0] else: manga = _interactive_select(mangas) if manga is None: raise typer.Exit() removed = unsubscribe_manga(db, manga.id, manga.source) if removed: print(f"[OK] 已取消订阅: {manga.title}") else: print(f"未订阅该漫画: {manga.title}") @app.command() def subscriptions(): """查看订阅列表。""" with Database() as db: subs = list_subscriptions(db) if not subs: print("暂无订阅,使用 subscribe 命令添加") raise typer.Exit() print(f"\n订阅列表 ({len(subs)} 部):\n") for sub in subs: title = sub["title"] author = f" [{sub['author']}]" if sub.get("author") else "" downloaded = sub["downloaded_chapters"] total = sub["total_chapters"] last_checked = sub["last_checked"] or "从未" auto_push = "开" if sub["auto_push"] else "关" print(f" {title}{author}") print(f" 来源: {sub['source']} 章节: {downloaded}/{total}") print(f" 最后检查: {last_checked} 自动推送: {auto_push}") @app.command() def update( push: bool = typer.Option( False, "--push", "-p", help="下载后推送到设备" ), ): """检查所有订阅的更新。""" config = load_config() with Database() as db: subs = list_subscriptions(db) if not subs: print("暂无订阅,使用 subscribe 命令添加") raise typer.Exit() print(f"检查 {len(subs)} 个订阅的更新...\n") pipeline = MangaPipeline(config, db) all_kepubs: list[Path] = [] for sub in subs: title = sub["title"] print(f"[{title}]") kepubs = _run( pipeline.check_and_download_updates( sub["manga_id"], sub["source"] ) ) all_kepubs.extend(kepubs) if all_kepubs: print(f"\n共 {len(all_kepubs)} 个新 KEPUB:") for p in all_kepubs: size_mb = p.stat().st_size / 1024 / 1024 print(f" {p.name} ({size_mb:.1f}MB)") if push: _do_push(config, all_kepubs) else: print("\n无新章节") @app.command() def daemon(): """启动守护进程,定期检查订阅更新。""" from kobo_manga.scheduler.daemon import UpdateScheduler config = load_config() with Database() as db: subs = list_subscriptions(db) if not subs: print("暂无订阅,使用 subscribe 命令添加") raise typer.Exit() interval = config.scheduler.interval print(f"守护进程启动: {len(subs)} 个订阅,间隔 {interval}s") print("按 Ctrl+C 退出\n") scheduler = UpdateScheduler(config, db) try: asyncio.run(scheduler.run_forever()) except KeyboardInterrupt: print("\n\n[OK] 守护进程已停止") def _do_push(config, kepub_paths: list[Path], manga_title: str = ""): """执行传输。""" print(f"\n传输 {len(kepub_paths)} 个文件 (方式: {config.transfer.method})...") try: transfer = get_transfer(config.transfer) if hasattr(transfer, "transfer"): if config.transfer.method == "usb": dest_paths = transfer.transfer(kepub_paths, manga_title) else: dest_paths = transfer.transfer(kepub_paths) print(f"[OK] 传输完成: {len(dest_paths)} 个文件") for p in dest_paths: print(f" -> {p}") except RuntimeError as e: print(f"[!] 传输失败: {e}") raise typer.Exit(1)