"""SQL 查询操作
所有数据库读写封装为函数,接收 Database 实例作为第一个参数。
"""
import json
from kobo_manga.db.database import Database
from kobo_manga.models import Chapter, MangaInfo, PageImage
# ── Manga ─────────────────────────────────────────────────
def upsert_manga(db: Database, manga: MangaInfo) -> None:
"""插入或更新漫画元数据。"""
tags_json = json.dumps(manga.tags, ensure_ascii=False) if manga.tags else "[]"
db.conn.execute(
"""\
INSERT INTO manga (id, source, title, alt_title, author, cover_url,
description, tags, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id, source) DO UPDATE SET
title=excluded.title, alt_title=excluded.alt_title,
author=excluded.author, cover_url=excluded.cover_url,
description=excluded.description, tags=excluded.tags,
url=excluded.url, updated_at=datetime('now')
""",
(manga.id, manga.source, manga.title, manga.alt_title,
manga.author, manga.cover_url, manga.description,
tags_json, manga.url),
)
db.conn.commit()
def get_manga(db: Database, manga_id: str, source: str) -> MangaInfo | None:
"""按 ID 和来源获取漫画。"""
row = db.conn.execute(
"SELECT * FROM manga WHERE id=? AND source=?",
(manga_id, source),
).fetchone()
if not row:
return None
return MangaInfo(
id=row["id"],
title=row["title"],
source=row["source"],
url=row["url"],
alt_title=row["alt_title"],
author=row["author"],
cover_url=row["cover_url"],
description=row["description"],
tags=json.loads(row["tags"]) if row["tags"] else [],
)
# ── Chapter ───────────────────────────────────────────────
def upsert_chapters(
db: Database, manga_id: str, source: str, chapters: list[Chapter]
) -> None:
"""批量插入/更新章节。已下载的章节不覆盖状态。"""
for ch in chapters:
db.conn.execute(
"""\
INSERT INTO chapter (id, manga_id, manga_source, title,
chapter_number, chapter_type, url, page_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id, manga_source) DO UPDATE SET
title=excluded.title, chapter_number=excluded.chapter_number,
chapter_type=excluded.chapter_type,
url=excluded.url, page_count=excluded.page_count,
updated_at=datetime('now')
WHERE chapter.status != 'downloaded'
""",
(ch.id, manga_id, source, ch.title,
ch.chapter_number, ch.chapter_type, ch.url, ch.page_count),
)
db.conn.commit()
def get_downloaded_chapter_ids(
db: Database, manga_id: str, source: str
) -> set[str]:
"""返回已下载章节的 ID 集合。"""
rows = db.conn.execute(
"SELECT id FROM chapter WHERE manga_id=? AND manga_source=? AND status='downloaded'",
(manga_id, source),
).fetchall()
return {row["id"] for row in rows}
def set_chapter_status(
db: Database,
chapter_id: str,
source: str,
status: str,
download_path: str | None = None,
) -> None:
"""更新章节状态。"""
if download_path is not None:
db.conn.execute(
"UPDATE chapter SET status=?, download_path=?, updated_at=datetime('now') "
"WHERE id=? AND manga_source=?",
(status, download_path, chapter_id, source),
)
else:
db.conn.execute(
"UPDATE chapter SET status=?, updated_at=datetime('now') "
"WHERE id=? AND manga_source=?",
(status, chapter_id, source),
)
db.conn.commit()
def get_chapters(
db: Database, manga_id: str, source: str
) -> list[Chapter]:
"""获取漫画的所有章节,按章节号排序。"""
rows = db.conn.execute(
"SELECT * FROM chapter WHERE manga_id=? AND manga_source=? "
"ORDER BY chapter_number",
(manga_id, source),
).fetchall()
return [
Chapter(
id=row["id"],
title=row["title"],
chapter_number=row["chapter_number"],
url=row["url"],
page_count=row["page_count"],
)
for row in rows
]
# ── Page ──────────────────────────────────────────────────
def upsert_pages(
db: Database, chapter_id: str, source: str, pages: list[PageImage]
) -> None:
"""批量插入页面记录。已存在的不覆盖(INSERT OR IGNORE)。"""
for page in pages:
db.conn.execute(
"""\
INSERT OR IGNORE INTO page (chapter_id, chapter_source,
page_number, url)
VALUES (?, ?, ?, ?)
""",
(chapter_id, source, page.page_number, page.url),
)
db.conn.commit()
def get_pending_pages(
db: Database, chapter_id: str, source: str
) -> list[PageImage]:
"""获取未下载的页面(status 为 pending 或 failed)。"""
rows = db.conn.execute(
"SELECT * FROM page WHERE chapter_id=? AND chapter_source=? "
"AND status IN ('pending', 'failed') ORDER BY page_number",
(chapter_id, source),
).fetchall()
return [
PageImage(
chapter_id=row["chapter_id"],
page_number=row["page_number"],
url=row["url"],
)
for row in rows
]
def mark_page_downloaded(
db: Database,
chapter_id: str,
source: str,
page_number: int,
local_path: str,
) -> None:
"""标记页面已下载。"""
db.conn.execute(
"UPDATE page SET status='downloaded', local_path=? "
"WHERE chapter_id=? AND chapter_source=? AND page_number=?",
(local_path, chapter_id, source, page_number),
)
db.conn.commit()
def mark_page_failed(
db: Database, chapter_id: str, source: str, page_number: int
) -> None:
"""标记页面下载失败。"""
db.conn.execute(
"UPDATE page SET status='failed' "
"WHERE chapter_id=? AND chapter_source=? AND page_number=?",
(chapter_id, source, page_number),
)
db.conn.commit()
def are_all_pages_downloaded(
db: Database, chapter_id: str, source: str
) -> bool:
"""检查章节的所有页面是否都已下载。"""
row = db.conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN status='downloaded' THEN 1 ELSE 0 END) as done "
"FROM page WHERE chapter_id=? AND chapter_source=?",
(chapter_id, source),
).fetchone()
return row["total"] > 0 and row["total"] == row["done"]
# ── CLI 查询 ─────────────────────────────────────────────
def list_all_manga(db: Database) -> list[MangaInfo]:
"""列出所有已入库的漫画。"""
rows = db.conn.execute(
"SELECT * FROM manga ORDER BY updated_at DESC"
).fetchall()
return [
MangaInfo(
id=row["id"],
title=row["title"],
source=row["source"],
url=row["url"],
alt_title=row["alt_title"],
author=row["author"],
cover_url=row["cover_url"],
description=row["description"],
tags=json.loads(row["tags"]) if row["tags"] else [],
)
for row in rows
]
def find_manga_by_title(
db: Database, keyword: str
) -> list[MangaInfo]:
"""按标题模糊搜索本地漫画库。"""
rows = db.conn.execute(
"SELECT * FROM manga WHERE title LIKE ? ORDER BY updated_at DESC",
(f"%{keyword}%",),
).fetchall()
return [
MangaInfo(
id=row["id"],
title=row["title"],
source=row["source"],
url=row["url"],
alt_title=row["alt_title"],
author=row["author"],
cover_url=row["cover_url"],
description=row["description"],
tags=json.loads(row["tags"]) if row["tags"] else [],
)
for row in rows
]
def get_manga_chapter_stats(
db: Database, manga_id: str, source: str
) -> dict:
"""获取漫画的章节统计信息。"""
row = db.conn.execute(
"SELECT "
" COUNT(*) as total, "
" SUM(CASE WHEN status='downloaded' THEN 1 ELSE 0 END) as downloaded, "
" SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) as failed, "
" SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) as pending "
"FROM chapter WHERE manga_id=? AND manga_source=?",
(manga_id, source),
).fetchone()
return {
"total": row["total"] or 0,
"downloaded": row["downloaded"] or 0,
"failed": row["failed"] or 0,
"pending": row["pending"] or 0,
}
# ── Subscription ─────────────────────────────────────────
def subscribe_manga(
db: Database, manga_id: str, source: str, auto_push: bool = False
) -> None:
"""订阅漫画。漫画必须已在 manga 表中。"""
db.conn.execute(
"""\
INSERT INTO subscription (manga_id, source, auto_push)
VALUES (?, ?, ?)
ON CONFLICT(manga_id, source) DO UPDATE SET
auto_push=excluded.auto_push
""",
(manga_id, source, int(auto_push)),
)
db.conn.commit()
def unsubscribe_manga(
db: Database, manga_id: str, source: str
) -> bool:
"""取消订阅。返回是否删除了记录。"""
cursor = db.conn.execute(
"DELETE FROM subscription WHERE manga_id=? AND source=?",
(manga_id, source),
)
db.conn.commit()
return cursor.rowcount > 0
def get_subscription(
db: Database, manga_id: str, source: str
) -> dict | None:
"""获取单个订阅记录。"""
row = db.conn.execute(
"SELECT * FROM subscription WHERE manga_id=? AND source=?",
(manga_id, source),
).fetchone()
if not row:
return None
return dict(row)
def list_subscriptions(db: Database) -> list[dict]:
"""列出所有订阅,关联漫画信息和章节统计。"""
rows = db.conn.execute(
"""\
SELECT s.manga_id, s.source, s.subscribed_at, s.last_checked,
s.last_chapter_id, s.auto_push,
m.title, m.author, m.url,
(SELECT COUNT(*) FROM chapter c
WHERE c.manga_id=s.manga_id AND c.manga_source=s.source) as total_chapters,
(SELECT COUNT(*) FROM chapter c
WHERE c.manga_id=s.manga_id AND c.manga_source=s.source
AND c.status='downloaded') as downloaded_chapters
FROM subscription s
JOIN manga m ON s.manga_id = m.id AND s.source = m.source
ORDER BY s.subscribed_at DESC
"""
).fetchall()
return [dict(row) for row in rows]
def update_subscription_checked(
db: Database,
manga_id: str,
source: str,
last_chapter_id: str | None = None,
) -> None:
"""更新订阅的最后检查时间(及最新章节 ID)。"""
if last_chapter_id is not None:
db.conn.execute(
"UPDATE subscription SET last_checked=datetime('now'), "
"last_chapter_id=? WHERE manga_id=? AND source=?",
(last_chapter_id, manga_id, source),
)
else:
db.conn.execute(
"UPDATE subscription SET last_checked=datetime('now') "
"WHERE manga_id=? AND source=?",
(manga_id, source),
)
db.conn.commit()