"""SQL 查询操作 所有数据库读写封装为函数,接收 Database 实例作为第一个参数。 """ import json from kobo_manga.db.database import Database from kobo_manga.models import Chapter, MangaInfo, PageImage # ── Manga ───────────────────────────────────────────────── def upsert_manga(db: Database, manga: MangaInfo) -> None: """插入或更新漫画元数据。""" tags_json = json.dumps(manga.tags, ensure_ascii=False) if manga.tags else "[]" db.conn.execute( """\ INSERT INTO manga (id, source, title, alt_title, author, cover_url, description, tags, url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id, source) DO UPDATE SET title=excluded.title, alt_title=excluded.alt_title, author=excluded.author, cover_url=excluded.cover_url, description=excluded.description, tags=excluded.tags, url=excluded.url, updated_at=datetime('now') """, (manga.id, manga.source, manga.title, manga.alt_title, manga.author, manga.cover_url, manga.description, tags_json, manga.url), ) db.conn.commit() def get_manga(db: Database, manga_id: str, source: str) -> MangaInfo | None: """按 ID 和来源获取漫画。""" row = db.conn.execute( "SELECT * FROM manga WHERE id=? AND source=?", (manga_id, source), ).fetchone() if not row: return None return MangaInfo( id=row["id"], title=row["title"], source=row["source"], url=row["url"], alt_title=row["alt_title"], author=row["author"], cover_url=row["cover_url"], description=row["description"], tags=json.loads(row["tags"]) if row["tags"] else [], ) # ── Chapter ─────────────────────────────────────────────── def upsert_chapters( db: Database, manga_id: str, source: str, chapters: list[Chapter] ) -> None: """批量插入/更新章节。已下载的章节不覆盖状态。""" for ch in chapters: db.conn.execute( """\ INSERT INTO chapter (id, manga_id, manga_source, title, chapter_number, chapter_type, url, page_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id, manga_source) DO UPDATE SET title=excluded.title, chapter_number=excluded.chapter_number, chapter_type=excluded.chapter_type, url=excluded.url, page_count=excluded.page_count, updated_at=datetime('now') WHERE chapter.status != 'downloaded' """, (ch.id, manga_id, source, ch.title, ch.chapter_number, ch.chapter_type, ch.url, ch.page_count), ) db.conn.commit() def get_downloaded_chapter_ids( db: Database, manga_id: str, source: str ) -> set[str]: """返回已下载章节的 ID 集合。""" rows = db.conn.execute( "SELECT id FROM chapter WHERE manga_id=? AND manga_source=? AND status='downloaded'", (manga_id, source), ).fetchall() return {row["id"] for row in rows} def set_chapter_status( db: Database, chapter_id: str, source: str, status: str, download_path: str | None = None, ) -> None: """更新章节状态。""" if download_path is not None: db.conn.execute( "UPDATE chapter SET status=?, download_path=?, updated_at=datetime('now') " "WHERE id=? AND manga_source=?", (status, download_path, chapter_id, source), ) else: db.conn.execute( "UPDATE chapter SET status=?, updated_at=datetime('now') " "WHERE id=? AND manga_source=?", (status, chapter_id, source), ) db.conn.commit() def get_chapters( db: Database, manga_id: str, source: str ) -> list[Chapter]: """获取漫画的所有章节,按章节号排序。""" rows = db.conn.execute( "SELECT * FROM chapter WHERE manga_id=? AND manga_source=? " "ORDER BY chapter_number", (manga_id, source), ).fetchall() return [ Chapter( id=row["id"], title=row["title"], chapter_number=row["chapter_number"], url=row["url"], page_count=row["page_count"], ) for row in rows ] # ── Page ────────────────────────────────────────────────── def upsert_pages( db: Database, chapter_id: str, source: str, pages: list[PageImage] ) -> None: """批量插入页面记录。已存在的不覆盖(INSERT OR IGNORE)。""" for page in pages: db.conn.execute( """\ INSERT OR IGNORE INTO page (chapter_id, chapter_source, page_number, url) VALUES (?, ?, ?, ?) """, (chapter_id, source, page.page_number, page.url), ) db.conn.commit() def get_pending_pages( db: Database, chapter_id: str, source: str ) -> list[PageImage]: """获取未下载的页面(status 为 pending 或 failed)。""" rows = db.conn.execute( "SELECT * FROM page WHERE chapter_id=? AND chapter_source=? " "AND status IN ('pending', 'failed') ORDER BY page_number", (chapter_id, source), ).fetchall() return [ PageImage( chapter_id=row["chapter_id"], page_number=row["page_number"], url=row["url"], ) for row in rows ] def mark_page_downloaded( db: Database, chapter_id: str, source: str, page_number: int, local_path: str, ) -> None: """标记页面已下载。""" db.conn.execute( "UPDATE page SET status='downloaded', local_path=? " "WHERE chapter_id=? AND chapter_source=? AND page_number=?", (local_path, chapter_id, source, page_number), ) db.conn.commit() def mark_page_failed( db: Database, chapter_id: str, source: str, page_number: int ) -> None: """标记页面下载失败。""" db.conn.execute( "UPDATE page SET status='failed' " "WHERE chapter_id=? AND chapter_source=? AND page_number=?", (chapter_id, source, page_number), ) db.conn.commit() def are_all_pages_downloaded( db: Database, chapter_id: str, source: str ) -> bool: """检查章节的所有页面是否都已下载。""" row = db.conn.execute( "SELECT COUNT(*) as total, " "SUM(CASE WHEN status='downloaded' THEN 1 ELSE 0 END) as done " "FROM page WHERE chapter_id=? AND chapter_source=?", (chapter_id, source), ).fetchone() return row["total"] > 0 and row["total"] == row["done"] # ── CLI 查询 ───────────────────────────────────────────── def list_all_manga(db: Database) -> list[MangaInfo]: """列出所有已入库的漫画。""" rows = db.conn.execute( "SELECT * FROM manga ORDER BY updated_at DESC" ).fetchall() return [ MangaInfo( id=row["id"], title=row["title"], source=row["source"], url=row["url"], alt_title=row["alt_title"], author=row["author"], cover_url=row["cover_url"], description=row["description"], tags=json.loads(row["tags"]) if row["tags"] else [], ) for row in rows ] def find_manga_by_title( db: Database, keyword: str ) -> list[MangaInfo]: """按标题模糊搜索本地漫画库。""" rows = db.conn.execute( "SELECT * FROM manga WHERE title LIKE ? ORDER BY updated_at DESC", (f"%{keyword}%",), ).fetchall() return [ MangaInfo( id=row["id"], title=row["title"], source=row["source"], url=row["url"], alt_title=row["alt_title"], author=row["author"], cover_url=row["cover_url"], description=row["description"], tags=json.loads(row["tags"]) if row["tags"] else [], ) for row in rows ] def get_manga_chapter_stats( db: Database, manga_id: str, source: str ) -> dict: """获取漫画的章节统计信息。""" row = db.conn.execute( "SELECT " " COUNT(*) as total, " " SUM(CASE WHEN status='downloaded' THEN 1 ELSE 0 END) as downloaded, " " SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) as failed, " " SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) as pending " "FROM chapter WHERE manga_id=? AND manga_source=?", (manga_id, source), ).fetchone() return { "total": row["total"] or 0, "downloaded": row["downloaded"] or 0, "failed": row["failed"] or 0, "pending": row["pending"] or 0, } # ── Subscription ───────────────────────────────────────── def subscribe_manga( db: Database, manga_id: str, source: str, auto_push: bool = False ) -> None: """订阅漫画。漫画必须已在 manga 表中。""" db.conn.execute( """\ INSERT INTO subscription (manga_id, source, auto_push) VALUES (?, ?, ?) ON CONFLICT(manga_id, source) DO UPDATE SET auto_push=excluded.auto_push """, (manga_id, source, int(auto_push)), ) db.conn.commit() def unsubscribe_manga( db: Database, manga_id: str, source: str ) -> bool: """取消订阅。返回是否删除了记录。""" cursor = db.conn.execute( "DELETE FROM subscription WHERE manga_id=? AND source=?", (manga_id, source), ) db.conn.commit() return cursor.rowcount > 0 def get_subscription( db: Database, manga_id: str, source: str ) -> dict | None: """获取单个订阅记录。""" row = db.conn.execute( "SELECT * FROM subscription WHERE manga_id=? AND source=?", (manga_id, source), ).fetchone() if not row: return None return dict(row) def list_subscriptions(db: Database) -> list[dict]: """列出所有订阅,关联漫画信息和章节统计。""" rows = db.conn.execute( """\ SELECT s.manga_id, s.source, s.subscribed_at, s.last_checked, s.last_chapter_id, s.auto_push, m.title, m.author, m.url, (SELECT COUNT(*) FROM chapter c WHERE c.manga_id=s.manga_id AND c.manga_source=s.source) as total_chapters, (SELECT COUNT(*) FROM chapter c WHERE c.manga_id=s.manga_id AND c.manga_source=s.source AND c.status='downloaded') as downloaded_chapters FROM subscription s JOIN manga m ON s.manga_id = m.id AND s.source = m.source ORDER BY s.subscribed_at DESC """ ).fetchall() return [dict(row) for row in rows] def update_subscription_checked( db: Database, manga_id: str, source: str, last_chapter_id: str | None = None, ) -> None: """更新订阅的最后检查时间(及最新章节 ID)。""" if last_chapter_id is not None: db.conn.execute( "UPDATE subscription SET last_checked=datetime('now'), " "last_chapter_id=? WHERE manga_id=? AND source=?", (last_chapter_id, manga_id, source), ) else: db.conn.execute( "UPDATE subscription SET last_checked=datetime('now') " "WHERE manga_id=? AND source=?", (manga_id, source), ) db.conn.commit()