"""CLI 命令定义"""
import asyncio
import sys
from pathlib import Path
from typing import Optional
import typer
from kobo_manga.config import load_config
from kobo_manga.db.database import Database
from kobo_manga.db.queries import (
find_manga_by_title,
get_manga_chapter_stats,
list_all_manga,
list_subscriptions,
subscribe_manga,
unsubscribe_manga,
)
from kobo_manga.models import MangaInfo
from kobo_manga.pipeline import MangaPipeline
from kobo_manga.sources import infer_source_from_url
from kobo_manga.transfer import get_transfer
# Windows GBK 兼容
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
app = typer.Typer(
name="kobo-manga",
help="Manga download-convert-transfer pipeline for Kobo e-reader",
no_args_is_help=True,
)
def _run(coro):
"""运行异步协程。"""
return asyncio.run(coro)
def _parse_chapter_range(chapters: str) -> tuple[float, float]:
"""解析章节范围字符串。
支持格式: "1-10", "5", "0-2.5"
"""
if "-" in chapters:
parts = chapters.split("-", 1)
return float(parts[0]), float(parts[1])
num = float(chapters)
return num, num
def _interactive_select(results: list[MangaInfo]) -> MangaInfo | None:
"""交互式选择漫画。"""
if not results:
print("未找到结果")
return None
print(f"\n找到 {len(results)} 个结果:\n")
for i, manga in enumerate(results, 1):
author = f" [{manga.author}]" if manga.author else ""
source_label = f" ({manga.source})" if manga.source else ""
print(f" {i}. {manga.title}{author}{source_label}")
if manga.description:
desc = manga.description[:60]
if len(manga.description) > 60:
desc += "..."
print(f" {desc}")
print()
while True:
choice = input("选择 (序号, q 退出): ").strip()
if choice.lower() == "q":
return None
try:
idx = int(choice) - 1
if 0 <= idx < len(results):
return results[idx]
except ValueError:
pass
print("无效输入,请重试")
def _sanitize_filename(name: str) -> str:
"""清理文件名中的非法字符。"""
return "".join(
c if c.isalnum() or c in " _-()()【】" else "_" for c in name
)
# ── 命令 ─────────────────────────────────────────────────
@app.command()
def search(
keyword: str = typer.Argument(help="搜索关键词"),
source: Optional[str] = typer.Option(
None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)"
),
):
"""搜索漫画。"""
config = load_config()
with Database() as db:
pipeline = MangaPipeline(config, db)
results = _run(pipeline.search(keyword, source_name=source))
selected = _interactive_select(results)
if selected is None:
raise typer.Exit()
# 获取详情
with Database() as db:
pipeline = MangaPipeline(config, db)
manga = _run(pipeline.get_manga_info(
selected.url, source_name=selected.source
))
print(f"\n{'='*50}")
print(f"标题: {manga.title}")
if manga.author:
print(f"作者: {manga.author}")
if manga.tags:
print(f"标签: {', '.join(manga.tags)}")
print(f"章节: {len(manga.chapters)} 个")
if manga.chapters:
first = manga.chapters[0]
last = manga.chapters[-1]
print(f"范围: {first.title} ~ {last.title}")
print(f"URL: {manga.url}")
@app.command()
def download(
target: str = typer.Argument(help="漫画名或 URL"),
chapters: Optional[str] = typer.Option(
None, "--chapters", "-c", help="章节范围 (如 1-10, 5)"
),
source: Optional[str] = typer.Option(
None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)"
),
chapter_type: Optional[str] = typer.Option(
None, "--type", "-t", help="章节类型筛选 (volume/chapter/extra)"
),
push: bool = typer.Option(
False, "--push", "-p", help="下载后推送到设备"
),
):
"""下载漫画并转换为 KEPUB。"""
config = load_config()
chapter_range = _parse_chapter_range(chapters) if chapters else None
with Database() as db:
pipeline = MangaPipeline(config, db)
# 判断是 URL 还是名字
if target.startswith("http"):
manga_url = target
src = source or infer_source_from_url(target)
else:
# 搜索并选择
results = _run(pipeline.search(target, source_name=source))
selected = _interactive_select(results)
if selected is None:
raise typer.Exit()
manga_url = selected.url
src = selected.source
# 全流程: 下载 → 处理 → KEPUB
kepub_paths = _run(
pipeline.download_and_convert(
manga_url,
source_name=src,
chapter_range=chapter_range,
chapter_type=chapter_type,
)
)
if not kepub_paths:
print("\n没有新的 KEPUB 文件生成")
raise typer.Exit()
print(f"\n生成 {len(kepub_paths)} 个 KEPUB 文件:")
for p in kepub_paths:
size_mb = p.stat().st_size / 1024 / 1024
print(f" {p.name} ({size_mb:.1f}MB)")
# 推送到设备
if push:
_do_push(config, kepub_paths)
@app.command(name="list")
def list_manga():
"""查看本地漫画库。"""
with Database() as db:
mangas = list_all_manga(db)
if not mangas:
print("本地库为空,使用 download 命令下载漫画")
raise typer.Exit()
print(f"\n本地漫画库 ({len(mangas)} 部):\n")
for manga in mangas:
stats = get_manga_chapter_stats(db, manga.id, manga.source)
author = f" [{manga.author}]" if manga.author else ""
downloaded = stats["downloaded"]
total = stats["total"]
print(f" {manga.title}{author}")
print(f" 章节: {downloaded}/{total} 已下载 来源: {manga.source}")
@app.command()
def push(
target: str = typer.Argument(help="漫画名"),
):
"""推送 KEPUB 到设备。"""
config = load_config()
# 查找 output 目录下的 KEPUB 文件
output_dir = Path("output") / _sanitize_filename(target)
if not output_dir.exists():
# 尝试模糊匹配 output 下的目录
output_base = Path("output")
if output_base.exists():
matches = [
d for d in output_base.iterdir()
if d.is_dir() and target.lower() in d.name.lower()
]
if len(matches) == 1:
output_dir = matches[0]
elif len(matches) > 1:
print("匹配到多个目录:")
for d in matches:
print(f" {d.name}")
print("请使用更精确的名称")
raise typer.Exit(1)
if not output_dir.exists():
print(f"未找到 KEPUB 文件目录: {output_dir}")
print("请先使用 download 命令下载并转换")
raise typer.Exit(1)
kepub_paths = sorted(output_dir.glob("*.kepub.epub"))
if not kepub_paths:
print(f"目录中无 .kepub.epub 文件: {output_dir}")
raise typer.Exit(1)
print(f"找到 {len(kepub_paths)} 个 KEPUB 文件")
_do_push(config, kepub_paths, manga_title=output_dir.name)
@app.command()
def config():
"""查看当前配置。"""
cfg = load_config()
print("\n当前配置:\n")
print(f"[设备]")
print(f" 型号: {cfg.device.model}")
print(f" 分辨率: {cfg.device.width}x{cfg.device.height}")
print(f" 彩色: {'是' if cfg.device.color else '否'}")
print()
print(f"[漫画源]")
print(f" 启用: {', '.join(cfg.sources)}")
print()
print(f"[图片处理]")
print(f" 双页拆分: {'开' if cfg.processing.split_double_page else '关'}")
print(f" 裁白边: {'开' if cfg.processing.crop_whitespace else '关'}")
print(f" 缩放: {'开' if cfg.processing.resize else '关'}")
print(f" 灰度: {'开' if cfg.processing.grayscale else '关'}")
print(f" 对比度增强: {'开' if cfg.processing.enhance_contrast else '关'}")
print(f" 对比度系数: {cfg.processing.contrast_factor}")
print()
print(f"[下载]")
print(f" 并发数: {cfg.download.concurrent}")
print(f" 重试次数: {cfg.download.retry}")
print(f" 延迟: {cfg.download.delay}s")
print()
print(f"[传输]")
print(f" 方式: {cfg.transfer.method}")
if cfg.transfer.method == "calibre":
print(f" Calibre: {cfg.transfer.calibre_host}:{cfg.transfer.calibre_port}")
@app.command()
def subscribe(
target: str = typer.Argument(help="漫画名或 URL"),
source: Optional[str] = typer.Option(
None, "--source", "-s", help="指定漫画源 (如 manhuagui, mangadex)"
),
auto_push: bool = typer.Option(
False, "--auto-push", help="新章节自动推送到设备"
),
):
"""订阅漫画追更。"""
config = load_config()
with Database() as db:
pipeline = MangaPipeline(config, db)
# 获取漫画信息
if target.startswith("http"):
src = source or infer_source_from_url(target)
manga = _run(pipeline.get_manga_info(target, source_name=src))
else:
results = _run(pipeline.search(target, source_name=source))
selected = _interactive_select(results)
if selected is None:
raise typer.Exit()
manga = _run(pipeline.get_manga_info(
selected.url, source_name=selected.source
))
# 订阅
subscribe_manga(db, manga.id, manga.source, auto_push)
print(f"\n[OK] 已订阅: {manga.title}")
print(f" 来源: {manga.source}")
print(f" 章节数: {len(manga.chapters)}")
if auto_push:
print(" 自动推送: 开")
@app.command()
def unsubscribe(
target: str = typer.Argument(help="漫画名"),
):
"""取消订阅。"""
with Database() as db:
# 在本地库搜索
mangas = find_manga_by_title(db, target)
if not mangas:
print(f"未找到匹配的漫画: {target}")
raise typer.Exit(1)
if len(mangas) == 1:
manga = mangas[0]
else:
manga = _interactive_select(mangas)
if manga is None:
raise typer.Exit()
removed = unsubscribe_manga(db, manga.id, manga.source)
if removed:
print(f"[OK] 已取消订阅: {manga.title}")
else:
print(f"未订阅该漫画: {manga.title}")
@app.command()
def subscriptions():
"""查看订阅列表。"""
with Database() as db:
subs = list_subscriptions(db)
if not subs:
print("暂无订阅,使用 subscribe 命令添加")
raise typer.Exit()
print(f"\n订阅列表 ({len(subs)} 部):\n")
for sub in subs:
title = sub["title"]
author = f" [{sub['author']}]" if sub.get("author") else ""
downloaded = sub["downloaded_chapters"]
total = sub["total_chapters"]
last_checked = sub["last_checked"] or "从未"
auto_push = "开" if sub["auto_push"] else "关"
print(f" {title}{author}")
print(f" 来源: {sub['source']} 章节: {downloaded}/{total}")
print(f" 最后检查: {last_checked} 自动推送: {auto_push}")
@app.command()
def update(
push: bool = typer.Option(
False, "--push", "-p", help="下载后推送到设备"
),
):
"""检查所有订阅的更新。"""
config = load_config()
with Database() as db:
subs = list_subscriptions(db)
if not subs:
print("暂无订阅,使用 subscribe 命令添加")
raise typer.Exit()
print(f"检查 {len(subs)} 个订阅的更新...\n")
pipeline = MangaPipeline(config, db)
all_kepubs: list[Path] = []
for sub in subs:
title = sub["title"]
print(f"[{title}]")
kepubs = _run(
pipeline.check_and_download_updates(
sub["manga_id"], sub["source"]
)
)
all_kepubs.extend(kepubs)
if all_kepubs:
print(f"\n共 {len(all_kepubs)} 个新 KEPUB:")
for p in all_kepubs:
size_mb = p.stat().st_size / 1024 / 1024
print(f" {p.name} ({size_mb:.1f}MB)")
if push:
_do_push(config, all_kepubs)
else:
print("\n无新章节")
@app.command()
def daemon():
"""启动守护进程,定期检查订阅更新。"""
from kobo_manga.scheduler.daemon import UpdateScheduler
config = load_config()
with Database() as db:
subs = list_subscriptions(db)
if not subs:
print("暂无订阅,使用 subscribe 命令添加")
raise typer.Exit()
interval = config.scheduler.interval
print(f"守护进程启动: {len(subs)} 个订阅,间隔 {interval}s")
print("按 Ctrl+C 退出\n")
scheduler = UpdateScheduler(config, db)
try:
asyncio.run(scheduler.run_forever())
except KeyboardInterrupt:
print("\n\n[OK] 守护进程已停止")
def _do_push(config, kepub_paths: list[Path], manga_title: str = ""):
"""执行传输。"""
print(f"\n传输 {len(kepub_paths)} 个文件 (方式: {config.transfer.method})...")
try:
transfer = get_transfer(config.transfer)
if hasattr(transfer, "transfer"):
if config.transfer.method == "usb":
dest_paths = transfer.transfer(kepub_paths, manga_title)
else:
dest_paths = transfer.transfer(kepub_paths)
print(f"[OK] 传输完成: {len(dest_paths)} 个文件")
for p in dest_paths:
print(f" -> {p}")
except RuntimeError as e:
print(f"[!] 传输失败: {e}")
raise typer.Exit(1)