@@ 0,0 1,277 @@
+"""FastAPI Web UI 应用
+
+sourcehut 风格渐进增强:
+- 基线:纯 HTML 表单 + 页面刷新,关闭 JS 也能完整使用
+- 增强:HTMX 局部刷新 + SSE 实时进度
+"""
+
+import asyncio
+import json
+import logging
+import re
+from contextlib import asynccontextmanager
+from pathlib import Path
+
+from fastapi import FastAPI, Form, Query, Request
+from fastapi.responses import HTMLResponse, RedirectResponse, Response, StreamingResponse
+from fastapi.staticfiles import StaticFiles
+from fastapi.templating import Jinja2Templates
+
+from kobo_manga.config import load_config
+from kobo_manga.db.database import Database
+from kobo_manga.db.queries import get_manga
+from kobo_manga.pipeline import MangaPipeline
+from kobo_manga.web.cover_proxy import fetch_cover
+from kobo_manga.web.tasks import (
+ cancel_task,
+ create_download_task,
+ get_all_tasks,
+ get_task,
+)
+
+logger = logging.getLogger(__name__)
+
+WEB_DIR = Path(__file__).parent
+TEMPLATES_DIR = WEB_DIR / "templates"
+STATIC_DIR = WEB_DIR / "static"
+
+
+def _render(request: Request, name: str, **ctx):
+ """渲染模板的便捷方法。"""
+ return templates.TemplateResponse(request=request, name=name, context=ctx)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """应用生命周期管理。"""
+ logger.info("Web UI 启动")
+ db = Database()
+ db.initialize()
+ app.state.db = db
+ yield
+ db.close()
+ logger.info("Web UI 关闭")
+
+
+app = FastAPI(title="kobo-manga", lifespan=lifespan)
+app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
+templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
+
+
+# 用来把 /kobo/<auth_token>/... 里的 token 段脱敏,避免进日志。
+_KOBO_TOKEN_RE = re.compile(r"/kobo/[^/?#]+")
+
+
+def _redact_kobo_token(url: str) -> str:
+ return _KOBO_TOKEN_RE.sub("/kobo/<redacted>", url)
+
+
+@app.middleware("http")
+async def log_all_requests(request: Request, call_next):
+ """记录所有 HTTP 请求(debug 级,脱敏 Kobo auth_token)。"""
+ if logger.isEnabledFor(logging.DEBUG):
+ safe_url = _redact_kobo_token(str(request.url))
+ client = request.client.host if request.client else "?"
+ logger.debug(">>> %s %s (from %s)", request.method, safe_url, client)
+ return await call_next(request)
+
+
+def _manga_url(source: str, manga_id: str) -> str:
+ """根据源和 ID 构造漫画 URL。"""
+ urls = {
+ "manhuagui": f"https://www.manhuagui.com/comic/{manga_id}/",
+ "mangadex": f"https://mangadex.org/title/{manga_id}",
+ }
+ return urls.get(source, "")
+
+
+def _is_htmx(request: Request) -> bool:
+ """检查是否为 HTMX 请求。"""
+ return request.headers.get("HX-Request") == "true"
+
+
+def _get_db() -> Database:
+ """获取共享数据库连接。回退到新建连接(测试场景)。"""
+ from starlette.requests import Request as _ # ensure app is importable
+ try:
+ return app.state.db
+ except AttributeError:
+ db = Database()
+ db.initialize()
+ return db
+
+
+# ── 路由 ─────────────────────────────────────────────────
+
+
+@app.get("/", response_class=HTMLResponse)
+async def index(request: Request):
+ """搜索首页。"""
+ return _render(request, "search.html", results=None, query="", error=None)
+
+
+@app.get("/search", response_class=HTMLResponse)
+async def search(
+ request: Request,
+ q: str = Query(default=""),
+ source: str = Query(default=""),
+):
+ """搜索漫画。"""
+ if not q.strip():
+ return _render(request, "search.html", results=None, query=q, error=None)
+
+ config = load_config()
+ db = _get_db()
+ try:
+ pipeline = MangaPipeline(config, db)
+ source_name = source if source else None
+ results = await pipeline.search(q.strip(), source_name=source_name)
+ except Exception as e:
+ logger.error("搜索失败: %s", e)
+ return _render(request, "search.html", results=[], query=q, error=str(e))
+
+ if _is_htmx(request):
+ return _render(request, "_search_results.html", results=results, query=q, error=None)
+
+ return _render(request, "search.html", results=results, query=q, error=None)
+
+
+@app.get("/manga/{source}/{manga_id}", response_class=HTMLResponse)
+async def manga_detail(request: Request, source: str, manga_id: str):
+ """漫画详情页。"""
+ config = load_config()
+ db = _get_db()
+ try:
+ pipeline = MangaPipeline(config, db)
+ manga = get_manga(db, manga_id, source)
+ url = manga.url if (manga and manga.url) else _manga_url(source, manga_id)
+ if not url:
+ return _render(request, "manga.html", manga=None, error=f"未知源: {source}")
+ manga = await pipeline.get_manga_info(url, source_name=source)
+ except Exception as e:
+ logger.error("获取漫画详情失败: %s", e)
+ return _render(request, "manga.html", manga=None, error=str(e))
+
+ return _render(request, "manga.html", manga=manga, error=None)
+
+
+@app.post("/download")
+async def download(
+ request: Request,
+ manga_id: str = Form(...),
+ source: str = Form(...),
+ chapters: str = Form(default="all"),
+ chapter_type: str = Form(default=""),
+):
+ """提交下载任务。"""
+ if chapters and chapters != "all":
+ try:
+ from kobo_manga.utils import parse_chapter_range
+ parse_chapter_range(chapters)
+ except (ValueError, IndexError):
+ if _is_htmx(request):
+ return HTMLResponse(
+ '<div class="error">无效的章节范围格式</div>',
+ status_code=400,
+ )
+ return RedirectResponse(
+ f"/manga/{source}/{manga_id}?error=invalid_range",
+ status_code=303,
+ )
+
+ task = create_download_task(
+ manga_id=manga_id,
+ source=source,
+ chapters=chapters,
+ chapter_type=chapter_type if chapter_type else None,
+ )
+
+ if _is_htmx(request):
+ return HTMLResponse(
+ f'<div class="success">任务已创建: {task.id}</div>'
+ f'<script>setTimeout(()=>window.location="/tasks",1000)</script>'
+ )
+
+ return RedirectResponse("/tasks", status_code=303)
+
+
+@app.get("/tasks", response_class=HTMLResponse)
+async def task_list(request: Request):
+ """任务列表页。"""
+ tasks = get_all_tasks()
+ has_active = any(t.status in ("pending", "downloading", "converting") for t in tasks)
+ return _render(request, "tasks.html", tasks=tasks, has_active=has_active)
+
+
+@app.get("/tasks/{task_id}/events")
+async def task_events(task_id: str):
+ """SSE 进度流。"""
+ async def event_stream():
+ last_status = None
+ last_progress = -1
+
+ while True:
+ task = get_task(task_id)
+ if not task:
+ yield f"event: error\ndata: {json.dumps({'error': 'task not found'})}\n\n"
+ return
+
+ if task.status != last_status or task.progress != last_progress:
+ last_status = task.status
+ last_progress = task.progress
+
+ data = {
+ "task_id": task.id,
+ "status": task.status,
+ "progress": task.progress,
+ "manga_title": task.manga_title,
+ "current_chapter": task.current_chapter,
+ "chapters_done": task.chapters_done,
+ "chapters_total": task.chapters_total,
+ "error_message": task.error_message,
+ }
+ yield f"event: progress\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+
+ if task.status in ("done", "failed", "cancelled"):
+ yield f"event: {task.status}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+ return
+
+ await asyncio.sleep(2)
+
+ return StreamingResponse(
+ event_stream(),
+ media_type="text/event-stream",
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+ )
+
+
+@app.post("/tasks/{task_id}/cancel")
+async def cancel(request: Request, task_id: str):
+ """取消下载任务。"""
+ cancel_task(task_id)
+ if _is_htmx(request):
+ return HTMLResponse('<div class="success">任务已取消</div>')
+ return RedirectResponse("/tasks", status_code=303)
+
+
+# ── Kobo Sync Router ────────────────────────────────────
+from kobo_manga.web.kobo_sync import router as kobo_sync_router
+app.include_router(kobo_sync_router)
+
+
+@app.get("/api/cover")
+async def cover_proxy(
+ url: str = Query(...),
+ source: str = Query(default=""),
+):
+ """封面图片代理。"""
+ result = await fetch_cover(url, source)
+ if result is None:
+ return Response(status_code=404)
+
+ content_type, data = result
+ return Response(
+ content=data,
+ media_type=content_type,
+ headers={"Cache-Control": "public, max-age=86400"},
+ )
@@ 0,0 1,838 @@
+"""Kobo Sync Protocol 实现
+
+实现 Kobo 设备的同步协议端点,让 Kobo 通过 WiFi 自动拉取新书。
+参考 calibre-web (cps/kobo.py) 和 Komga 的 Kobo sync 实现。
+"""
+
+import base64
+import json as json_mod
+import logging
+import uuid
+from datetime import datetime, timezone
+from pathlib import Path
+
+from fastapi import APIRouter, Depends, Request, Response
+from fastapi.responses import FileResponse, JSONResponse
+
+from kobo_manga.db.database import Database
+from kobo_manga.db.queries import (
+ get_device_by_token,
+ get_pending_syncs,
+ mark_synced,
+ update_device_sync_time,
+)
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/kobo/{auth_token}", tags=["kobo-sync"])
+
+# Kobo 官方 API 的基础 URL(用于 initialization 响应中不需要代理的服务)
+KOBO_STOREAPI_URL = "https://storeapi.kobo.com"
+
+
+def _get_db(request: Request) -> Database:
+ """从 app state 获取共享 DB。"""
+ return request.app.state.db
+
+
+def _utc_now() -> str:
+ """返回 UTC ISO 8601 时间戳。"""
+ return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
+
+
+def _validate_token(auth_token: str, db: Database) -> dict | None:
+ """验证 auth_token,返回设备信息或 None。"""
+ return get_device_by_token(db, auth_token)
+
+
+# ── Initialization ──────────────────────────────────────
+
+
+@router.get("/v1/initialization")
+async def initialization(
+ request: Request,
+ auth_token: str,
+ db: Database = Depends(_get_db),
+):
+ """设备初始化,返回服务器能力和资源 URL。
+
+ Kobo 设备首次连接时调用此端点获取各种服务的 URL。
+ 使用完整的 Kobo 官方资源列表作为基础,只覆盖图书同步相关的 URL。
+ 参考 calibre-web cps/kobo.py NATIVE_KOBO_RESOURCES。
+ """
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ base_url = str(request.base_url).rstrip("/")
+ kobo_base = f"{base_url}/kobo/{auth_token}"
+
+ # 从完整的 Kobo 原始资源开始,只覆盖我们需要代理的
+ resources = _native_kobo_resources()
+ resources.update({
+ "image_host": base_url,
+ "image_url_quality_template": f"{kobo_base}/v1/library/{{ImageId}}/thumbnail/{{Width}}/{{Height}}/false/image.jpg",
+ "image_url_template": f"{kobo_base}/v1/library/{{ImageId}}/thumbnail/{{Width}}/{{Height}}/false/image.jpg",
+ "library_sync": f"{kobo_base}/v1/library/sync",
+ "library_items": f"{kobo_base}/v1/library/{{BookId}}",
+ "content_access_book": f"{kobo_base}/v1/library/{{BookId}}/content",
+ "reading_state": f"{kobo_base}/v1/library/{{BookId}}/state",
+ "tags": f"{kobo_base}/v1/library/tags",
+ # 下载相关:Kobo 下载管理器可能用这些而不是 DownloadUrls
+ "get_download_keys": f"{kobo_base}/v1/library/downloadkeys",
+ "get_download_link": f"{kobo_base}/v1/library/downloadlink",
+ })
+
+ return JSONResponse(
+ {"Resources": resources},
+ headers={"x-kobo-apitoken": "e30="},
+ )
+
+
+def _native_kobo_resources() -> dict:
+ """Kobo 官方 API 的完整资源列表(fallback,不走代理时使用)。
+
+ 从 calibre-web 的 NATIVE_KOBO_RESOURCES 精简而来,
+ 包含 Kobo 固件期望存在的所有 key。
+ """
+ store = KOBO_STOREAPI_URL
+ return {
+ "account_page": "https://www.kobo.com/account/settings",
+ "account_page_rakuten": "https://my.rakuten.co.jp/",
+ "add_entitlement": f"{store}/v1/library/{{BookId}}",
+ "affiliaterequest": f"{store}/v1/affiliate",
+ "audiobook_landing_page": "https://www.kobo.com/audiobooks",
+ "audiobook_subscription_orange_deal_inclusion_url": "https://www.kobo.com/{{region}}/{{language}}/audiobook-subscription",
+ "autocomplete": f"{store}/v1/products/autocomplete",
+ "blackstone_header": {
+ "key": "x-]8YT}6eK9Cj",
+ "value": "SmFjaz1AcnlASE8jbmpCUUBrcGNRQHQ=",
+ },
+ "book": f"{store}/v1/products/books/{{ProductId}}",
+ "book_detail_page": "https://www.kobo.com/{{region}}/{{language}}/ebook/{{slug}}",
+ "book_detail_page_rakuten": "http://books.rakuten.co.jp/rk/{{crossrevisionid}}",
+ "book_landing_page": "https://www.kobo.com/ebooks",
+ "book_subscription": f"{store}/v1/products/books/series/{{SeriesId}}",
+ "categories": f"{store}/v1/categories",
+ "categories_page": "https://www.kobo.com/{{region}}/{{language}}/categories",
+ "category": f"{store}/v1/categories/{{CategoryId}}",
+ "category_featured_lists": f"{store}/v1/categories/{{CategoryId}}/featured",
+ "category_products": f"{store}/v1/categories/{{CategoryId}}/products",
+ "checkout_borrowed_book": f"{store}/v1/library/{{BookId}}/checkout",
+ "configuration_data": f"{store}/v1/configuration",
+ "content_access_book": f"{store}/v1/products/books/{{ProductId}}/access",
+ "customer_care_live_chat": "https://help.kobo.com/hc/en-us",
+ "daily_deal": f"{store}/v1/deals/dailydeal",
+ "deals": f"{store}/v1/deals",
+ "delete_entitlement": f"{store}/v1/library/{{BookId}}",
+ "delete_tag": f"{store}/v1/library/tags/{{TagId}}",
+ "delete_tag_items": f"{store}/v1/library/tags/{{TagId}}/items/delete",
+ "device_auth": "https://storeapi.kobo.com/v1/auth/device",
+ "device_refresh": "https://storeapi.kobo.com/v1/auth/refresh",
+ "dictionary_host": "https://ereaderfiles.kobo.com",
+ "discovery_host": f"{store}",
+ "eula_page": "https://www.kobo.com/termsofuse?style=onestore",
+ "exchange_auth": "https://storeapi.kobo.com/v1/auth/exchange",
+ "external_book": f"{store}/v1/products/books/external/{{Ids}}",
+ "facebook_sso_page": "https://authorize.kobo.com/signin/provider/Facebook/login?returnUrl=http://store.kobobooks.com",
+ "featured_list": f"{store}/v1/products/featured/{{FeaturedListId}}",
+ "featured_lists": f"{store}/v1/products/featured",
+ "free_books_page": {
+ "EN": "https://www.kobo.com/{{region}}/{{language}}/p/free-ebooks",
+ "FR": "https://www.kobo.com/{{region}}/{{language}}/p/livres-gratuits",
+ "DE": "https://www.kobo.com/{{region}}/{{language}}/p/gratis-ebooks",
+ "NL": "https://www.kobo.com/{{region}}/{{language}}/p/gratis-ebooks",
+ "IT": "https://www.kobo.com/{{region}}/{{language}}/p/libri-gratuiti",
+ "ES": "https://www.kobo.com/{{region}}/{{language}}/p/libros-gratis",
+ "PT": "https://www.kobo.com/{{region}}/{{language}}/p/livros-gratis",
+ "JA": "https://www.kobo.com/{{region}}/{{language}}/p/free-ebooks",
+ },
+ "fte_feedback": f"{store}/v1/products/ftefeedback",
+ "get_tests_request": f"{store}/v1/analytics/gettests",
+ "giftcard_epd_redeem_url": "https://www.kobo.com/{{region}}/{{language}}/redeem-ereader",
+ "giftcard_redeem_url": "https://www.kobo.com/{{region}}/{{language}}/redeem",
+ "help_page": "http://www.kobo.com/help",
+ "image_host": "https://cdn.kobo.com/book-images",
+ "image_url_quality_template": "https://cdn.kobo.com/book-images/{ImageId}/{Width}/{Height}/{Quality}/{IsGreyscale}/image.jpg",
+ "image_url_template": "https://cdn.kobo.com/book-images/{ImageId}/{Width}/{Height}/false/image.jpg",
+ "kobo_audiobooks_enabled": "False",
+ "kobo_audiobooks_orange_deal_enabled": "False",
+ "kobo_audiobooks_subscriptions_enabled": "False",
+ "kobo_nativeborrow_enabled": "True",
+ "kobo_onestorelibrary_enabled": "False",
+ "kobo_redeem_enabled": "True",
+ "kobo_shelfie_enabled": "False",
+ "kobo_subscriptions_enabled": "False",
+ "kobo_superpoints_enabled": "False",
+ "kobo_wishlist_enabled": "True",
+ "library_book": f"{store}/v1/user/library/{{BookId}}",
+ "library_items": f"{store}/v1/user/library",
+ "library_metadata": f"{store}/v1/library/{{BookId}}/metadata",
+ "library_prices": f"{store}/v1/user/library/previews/price",
+ "library_search": f"{store}/v1/library/search",
+ "library_sync": f"{store}/v1/library/sync",
+ "love_dashboard_page": "https://www.kobo.com/{{region}}/{{language}}/kobosuperpoints",
+ "love_points_redemption_page": "https://www.kobo.com/{{region}}/{{language}}/KoboSuperPointsRedemption?productId={{ProductId}}",
+ "magazine_landing_page": "https://www.kobo.com/emagazines",
+ "notifications_registration_issue": f"{store}/v1/notifications/registration",
+ "oauth_host": "https://oauth.kobo.com",
+ "password_retrieval_page": "https://www.kobo.com/passwordretrieval.html",
+ "post_analytics_event": f"{store}/v1/analytics/event",
+ "privacy_page": "https://www.kobo.com/privacypolicy?style=onestore",
+ "product_nextread": f"{store}/v1/products/{{ProductId}}/nextread",
+ "product_prices": f"{store}/v1/products/{{ProductId}}/prices",
+ "product_recommendations": f"{store}/v1/products/{{ProductId}}/recommendations",
+ "product_reviews": f"{store}/v1/products/{{ProductIds}}/reviews",
+ "products": f"{store}/v1/products",
+ "provider_external_sign_in_page": "https://authorize.kobo.com/ExternalSignIn/{{ProviderName}}?returnUrl=http://store.kobobooks.com",
+ "purchase_buy_templated": "https://www.kobo.com/{{region}}/{{language}}/checkoutoption/{{ProductId}}",
+ "quickbuy_checkout": f"{store}/v1/store/quickbuy/{{PurchaseId}}/checkout",
+ "quickbuy_create": f"{store}/v1/store/quickbuy",
+ "rating": f"{store}/v1/products/{{ProductId}}/rating/{{Rating}}",
+ "reading_services_host": "https://readingservices.kobo.com",
+ "reading_state": f"{store}/v1/library/{{BookId}}/state",
+ "registration_page": "https://authorize.kobo.com/signup?returnUrl=http://store.kobobooks.com",
+ "related_items": f"{store}/v1/products/{{Id}}/related",
+ "remaining_book_series": f"{store}/v1/products/books/series/{{SeriesId}}",
+ "rename_tag": f"{store}/v1/library/tags/{{TagId}}",
+ "review": f"{store}/v1/products/reviews/{{ReviewId}}",
+ "review_sentiment": f"{store}/v1/products/reviews/{{ReviewId}}/sentiment/{{Sentiment}}",
+ "shelfie_recommendations": f"{store}/v1/user/recommendations/shelfie",
+ "sign_in_page": "https://authorize.kobo.com/signin?returnUrl=http://store.kobobooks.com",
+ "social_authorization_host": "https://social.kobobooks.com/soc498",
+ "social_host": "https://social.kobobooks.com",
+ "store_home": "www.kobo.com/{{region}}/{{language}}",
+ "store_host": "www.kobo.com",
+ "store_newreleases": "https://www.kobo.com/{{region}}/{{language}}/List/new-releases/702l7BQRF0qxkBhIjFyniQ",
+ "store_search": "https://www.kobo.com/{{region}}/{{language}}/Search?Query={{query}}",
+ "store_top50": "https://www.kobo.com/{{region}}/{{language}}/List/top-50/aZzhelcQ71qJjlYulfCpEQ",
+ "tag_items": f"{store}/v1/library/tags/{{TagId}}/Items",
+ "tags": f"{store}/v1/library/tags",
+ "taste_profile": f"{store}/v1/products/tasteprofile",
+ "update_accessibility_to_preview": f"{store}/v1/library/{{BookId}}/preview",
+ "use_one_store": "False",
+ "user_loyalty_benefits": f"{store}/v1/user/loyalty/benefits",
+ "user_platform": f"{store}/v1/user/platform",
+ "user_profile": f"{store}/v1/user/profile",
+ "user_ratings": f"{store}/v1/user/ratings",
+ "user_recommendations": f"{store}/v1/user/recommendations",
+ "user_reviews": f"{store}/v1/user/reviews",
+ "user_wishlist": f"{store}/v1/user/wishlist",
+ "userguide_host": "https://ereaderfiles.kobo.com",
+ "wishlist_page": "https://www.kobo.com/{{region}}/{{language}}/account/wishlist",
+ }
+
+
+# ── Library Sync ────────────────────────────────────────
+
+
+def _build_entitlement(book_id: str, kepub_path: str, download_url: str) -> dict:
+ """构建 Kobo NewEntitlement 响应对象。
+
+ 格式参考 calibre-web cps/kobo.py get_metadata()。
+ 所有 UUID 字段使用同一个 book_id 保持一致。
+ """
+ now = _utc_now()
+ file_size = 0
+ path = Path(kepub_path)
+ if path.exists():
+ file_size = path.stat().st_size
+
+ # 从文件名推断标题
+ title = path.stem.replace(".kepub", "")
+
+ return {
+ "NewEntitlement": {
+ "BookEntitlement": {
+ "Accessibility": "Full",
+ "ActivePeriod": {"From": now},
+ "Created": now,
+ "CrossRevisionId": book_id,
+ "Id": book_id,
+ "IsHiddenFromArchive": False,
+ "IsLocked": False,
+ "IsRemoved": False,
+ "LastModified": now,
+ "OriginCategory": "Imported",
+ "RevisionId": book_id,
+ "Status": "Active",
+ },
+ "BookMetadata": {
+ "Categories": ["00000000-0000-0000-0000-000000000001"],
+ "ContributorRoles": [{"Name": "kobo-manga"}],
+ "Contributors": ["kobo-manga"],
+ "CoverImageId": book_id,
+ "CrossRevisionId": book_id,
+ "CurrentDisplayPrice": {"CurrencyCode": "USD", "TotalAmount": 0},
+ "CurrentLoveDisplayPrice": {"TotalAmount": 0},
+ "Description": "",
+ # Format 必须是 EPUB3FL(不能写 KEPUB),因为我们打的
+ # 是 pre-paginated EPUB3 固定布局漫画。calibre-web 在
+ # cps/kobo.py:445-459 get_metadata() 里有这段分支:
+ #
+ # for kobo_format in KOBO_FORMATS[book_data.format]:
+ # if get_epub_layout(book, book_data) == 'pre-paginated':
+ # kobo_format = 'EPUB3FL'
+ # download_urls.append({
+ # "Format": kobo_format,
+ # "Size": book_data.uncompressed_size,
+ # "Url": get_download_url_for_book(...),
+ # "Platform": "Generic",
+ # })
+ #
+ # 如果写成 KEPUB,Kobo 固件 mime 校验失败,会把 entitlement
+ # 写入 content 表但 DownloadUrl 字段被填成字面字符串 "true",
+ # 永远不会触发实际下载,也不会进 SyncQueue。
+ # URL 路径里的 format segment 仍是 'kepub' 小写 —— calibre-web
+ # 也是这样:传给 get_download_url_for_book 的是 book_data.format
+ # 原始值,不是 kobo_format。
+ "DownloadUrls": [
+ {
+ "Format": "EPUB3FL",
+ "Size": file_size,
+ "Url": download_url,
+ "Platform": "Generic",
+ }
+ ],
+ "EntitlementId": book_id,
+ "ExternalIds": [],
+ "Genre": "00000000-0000-0000-0000-000000000001",
+ "IsEligibleForKoboLove": False,
+ "IsInternetArchive": False,
+ "IsPreOrder": False,
+ "IsSocialEnabled": True,
+ "Language": "zh",
+ "PhoneticPronunciations": {},
+ "PublicationDate": now[:10] + "T00:00:00Z",
+ "Publisher": {"Imprint": "", "Name": "kobo-manga"},
+ "RevisionId": book_id,
+ "Title": title,
+ "WorkId": book_id,
+ },
+ "ReadingState": {
+ "Created": now,
+ "CurrentBookmark": {"LastModified": now},
+ "EntitlementId": book_id,
+ "LastModified": now,
+ "PriorityTimestamp": now,
+ "Statistics": {"LastModified": now},
+ "StatusInfo": {
+ "LastModified": now,
+ "Status": "ReadyToRead",
+ "TimesStartedReading": 0,
+ },
+ },
+ }
+ }
+
+
+@router.get("/v1/library/sync")
+async def library_sync(
+ request: Request,
+ auth_token: str,
+ db: Database = Depends(_get_db),
+):
+ """返回待同步书籍列表。
+
+ Kobo 设备定期调用此端点检查新书。
+ 返回 NewEntitlement 列表,设备会自动下载。
+ """
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ base_url = str(request.base_url).rstrip("/")
+ pending = get_pending_syncs(db, device["device_id"])
+
+ entitlements = []
+ for item in pending:
+ download_url = (
+ f"{base_url}/kobo/{auth_token}/download/"
+ f"{item['book_id']}/kepub"
+ )
+ entitlement = _build_entitlement(
+ item["book_id"], item["kepub_path"], download_url
+ )
+ entitlements.append(entitlement)
+ # 不在这里 mark_synced:Kobo 的下载流程是
+ # sync → metadata → download
+ # 三步异步进行,每一步可能在不同的 HTTP 连接里。如果在 sync
+ # 阶段就 mark_synced,下次 get_pending_syncs 会返回空,那本书
+ # 永远不会出现在后续 sync 响应里。Kobo 一旦在某次 sync 中收到
+ # 错误的 entitlement(比如 Format 字段错),就会把它写进 content
+ # 表的"中毒"行(DownloadUrl='true'),而后续 sync 因为返回空而
+ # 没机会用修正后的 entitlement 覆盖它。
+ # mark_synced 移到 download_book 端点(line ~430),等设备
+ # 真的拉到 KEPUB 文件之后才标记。
+ # calibre-web 用 SyncToken 时间戳做 delta 同步(cps/kobo.py
+ # SyncToken.from_headers / generate_sync_response),我们没
+ # 实现 sync_token delta,副作用是同一本未下载的书会在每次
+ # sync 都重复返回,但 Kobo 客户端按 RevisionId 去重,不会
+ # 重复加进库。
+
+ update_device_sync_time(db, device["device_id"])
+
+ logger.info(
+ "Kobo sync: 设备 %s, %d 本待同步",
+ device["device_name"], len(entitlements),
+ )
+
+ # sync token 格式必须和 calibre-web 一致,包含 version 字段
+ # Kobo 固件依赖这个格式来决定是否处理 DownloadUrls
+ import time
+ now_ts = time.time()
+ sync_token_data = {
+ "data": {
+ "archive_last_modified": -62135596800.0,
+ "books_last_created": -62135596800.0,
+ "books_last_modified": -62135596800.0,
+ "raw_kobo_store_token": "",
+ "reading_state_last_modified": now_ts,
+ "tags_last_modified": -62135596800.0,
+ },
+ "version": "1-1-0",
+ }
+ sync_token = base64.b64encode(
+ json_mod.dumps(sync_token_data).encode()
+ ).decode()
+
+ headers = {
+ "x-kobo-synctoken": sync_token,
+ "x-kobo-apitoken": "e30=",
+ }
+
+ return Response(
+ content=json_mod.dumps(entitlements, ensure_ascii=False),
+ media_type="application/json",
+ headers=headers,
+ )
+
+
+# ── Download ────────────────────────────────────────────
+
+
+@router.get("/download/{book_id}/{download_format}")
+async def download_book_short(
+ auth_token: str,
+ book_id: str,
+ download_format: str,
+ db: Database = Depends(_get_db),
+):
+ """简短路径下载(calibre-web 兼容格式)。"""
+ return await download_book(auth_token, book_id, download_format, db)
+
+
+@router.get("/v1/library/{book_id}/download/{download_format}")
+async def download_book(
+ auth_token: str,
+ book_id: str,
+ download_format: str,
+ db: Database = Depends(_get_db),
+):
+ """提供 KEPUB 文件下载。
+
+ Kobo 的下载流程:sync → metadata → download。设备只有在前两步
+ 都成功并把书加入 SyncQueue 后,才会用 DownloadUrls[].Url 拉取
+ 实际文件。
+
+ mark_synced 必须在这个端点而不是 library_sync 里调用,原因见
+ library_sync 内部注释。calibre-web 用 sync_token 时间戳做 delta,
+ 不需要显式 mark;我们没实现 sync_token delta,所以用"设备真的
+ 下载到文件"这个事件作为 mark_synced 的触发点。
+ """
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ # 查找文件
+ pending = get_pending_syncs(db, device["device_id"])
+ target = next((p for p in pending if p["book_id"] == book_id), None)
+
+ # 也查已同步的(设备可能重新下载)
+ if not target:
+ row = db.conn.execute(
+ "SELECT * FROM kobo_sync_state WHERE device_id=? AND book_id=?",
+ (device["device_id"], book_id),
+ ).fetchone()
+ target = dict(row) if row else None
+
+ if not target:
+ return JSONResponse({"error": "Book not found"}, status_code=404)
+
+ kepub_path = Path(target["kepub_path"])
+ if not kepub_path.exists():
+ logger.error("KEPUB 文件不存在: %s", kepub_path)
+ return JSONResponse({"error": "File not found"}, status_code=404)
+
+ # 标记为已同步
+ mark_synced(db, device["device_id"], book_id)
+
+ logger.info("Kobo download: %s -> %s", device["device_name"], kepub_path.name)
+
+ return FileResponse(
+ path=str(kepub_path),
+ media_type="application/epub+zip",
+ filename=kepub_path.name,
+ )
+
+
+# ── Stub Endpoints ──────────────────────────────────────
+
+
+@router.put("/v1/library/{book_id}/state")
+async def update_state(
+ auth_token: str,
+ book_id: str,
+ request: Request,
+ db: Database = Depends(_get_db),
+):
+ """接收阅读进度(stub,仅返回 200)。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+ return JSONResponse({"result": "ok"})
+
+
+@router.get("/v1/library/tags")
+async def get_tags(
+ auth_token: str,
+ db: Database = Depends(_get_db),
+):
+ """返回书架标签(stub,返回空列表)。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+ return JSONResponse([])
+
+
+@router.get("/v1/library/{book_id}/thumbnail/{width}/{height}/{is_grey}/image.jpg")
+async def get_thumbnail(
+ auth_token: str,
+ book_id: str,
+ width: int,
+ height: int,
+ is_grey: str,
+ db: Database = Depends(_get_db),
+):
+ """封面缩略图:从对应 KEPUB zip 中提取 OEBPS/Images/cover.jpg。
+
+ Kobo 在 sync 接受 entitlement 后会立即请求两次封面(一次小尺寸
+ 355x530 给列表视图,一次大尺寸 1072/1448 给详情视图)。
+ 如果封面返回 404,Kobo UI 会显示空白书名占位,并且**可能阻塞后续
+ 的 download 触发逻辑** —— 我们实测当封面是 404 时,metadata 端点
+ 被反复重试但 download 永不发起。
+
+ calibre-web 在 cps/kobo.py HandleCoverImageRequest 里用 Pillow
+ 处理封面(resize/grayscale),我们简化处理:直接从 kepub zip 提取
+ OEBPS/Images/cover.jpg 然后按 Kobo 请求的尺寸 thumbnail。
+ is_grey 参数我们暂时忽略(Kobo 自己会做 e-ink 灰度转换)。
+ """
+ import io
+ import zipfile
+
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ # 找到这本书对应的 kepub 路径(pending 或已 synced 都查)
+ row = db.conn.execute(
+ "SELECT kepub_path FROM kobo_sync_state WHERE device_id=? AND book_id=?",
+ (device["device_id"], book_id),
+ ).fetchone()
+ if not row:
+ return Response(status_code=404)
+
+ kepub_path = Path(row["kepub_path"])
+ if not kepub_path.exists():
+ return Response(status_code=404)
+
+ try:
+ with zipfile.ZipFile(kepub_path, "r") as zf:
+ # 优先 cover.jpg,回退第一张图片
+ names = zf.namelist()
+ cover_name = next(
+ (n for n in names if n.endswith("OEBPS/Images/cover.jpg")),
+ None,
+ )
+ if cover_name is None:
+ cover_name = next(
+ (n for n in names if "Images/" in n and n.lower().endswith(
+ (".jpg", ".jpeg", ".png", ".webp")
+ )),
+ None,
+ )
+ if cover_name is None:
+ return Response(status_code=404)
+ data = zf.read(cover_name)
+ except (zipfile.BadZipFile, KeyError, OSError) as e:
+ logger.warning("读取封面失败 %s: %s", kepub_path, e)
+ return Response(status_code=404)
+
+ # 按 Kobo 请求的尺寸缩放(保持比例)
+ try:
+ from PIL import Image
+ img = Image.open(io.BytesIO(data))
+ img.thumbnail((width, height), Image.LANCZOS)
+ if img.mode != "RGB":
+ img = img.convert("RGB")
+ buf = io.BytesIO()
+ img.save(buf, format="JPEG", quality=85)
+ data = buf.getvalue()
+ except Exception as e:
+ logger.warning("缩放封面失败 %s: %s(返回原图)", kepub_path, e)
+
+ return Response(content=data, media_type="image/jpeg")
+
+
+# Kobo 设备可能请求的其他端点,返回空响应避免无限重试
+@router.get("/v1/library/{book_id}")
+async def get_book_entitlement(
+ auth_token: str,
+ book_id: str,
+ db: Database = Depends(_get_db),
+):
+ """单本书的 entitlement(stub)。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+ return JSONResponse({})
+
+
+@router.get("/v1/library/{book_id}/content")
+async def get_content_access(
+ auth_token: str,
+ book_id: str,
+ db: Database = Depends(_get_db),
+):
+ """内容访问权限(stub)。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+ return JSONResponse({})
+
+
+# ── Kobo 官方 API 兼容 Stubs ──────────────────────────────
+# Kobo 设备会请求这些端点,即使 initialization 里指向了官方服务器。
+# 返回合理的空响应避免 404 和重试。
+
+
+@router.get("/v1/user/wishlist")
+async def get_wishlist(auth_token: str):
+ """用户愿望清单(stub)。"""
+ return JSONResponse({"Items": [], "TotalCount": 0})
+
+
+@router.get("/v1/user/recommendations")
+async def get_recommendations(auth_token: str):
+ """用户推荐(stub)。"""
+ return JSONResponse([])
+
+
+@router.get("/v1/user/profile")
+async def get_user_profile(auth_token: str):
+ """用户资料(stub)。"""
+ return JSONResponse({})
+
+
+@router.get("/v1/user/loyalty/benefits")
+async def get_loyalty_benefits(auth_token: str):
+ """会员权益(stub)。"""
+ return JSONResponse({"Benefits": {}})
+
+
+@router.get("/v1/deals")
+async def get_deals(auth_token: str):
+ """促销信息(stub)。"""
+ return JSONResponse([])
+
+
+@router.post("/v1/analytics/event")
+async def analytics_event(auth_token: str, request: Request):
+ """分析事件(stub,接收但忽略)。"""
+ return JSONResponse({"result": "ok"})
+
+
+@router.post("/v1/analytics/gettests")
+async def analytics_gettests(auth_token: str, request: Request):
+ """A/B 测试配置(stub)。"""
+ return JSONResponse({"Results": []})
+
+
+@router.get("/v1/products/{product_id}/nextread")
+async def get_nextread(auth_token: str, product_id: str):
+ """下一本推荐(stub)。"""
+ return JSONResponse([])
+
+
+@router.get("/v1/library/{book_id}/metadata")
+async def get_book_metadata(
+ request: Request,
+ auth_token: str,
+ book_id: str,
+ db: Database = Depends(_get_db),
+):
+ """书籍元数据(Kobo 下载前请求)。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ # 从同步队列找到对应书籍
+ pending = get_pending_syncs(db, device["device_id"])
+ target = next((p for p in pending if p["book_id"] == book_id), None)
+ if not target:
+ row = db.conn.execute(
+ "SELECT * FROM kobo_sync_state WHERE device_id=? AND book_id=?",
+ (device["device_id"], book_id),
+ ).fetchone()
+ target = dict(row) if row else None
+
+ if not target:
+ return JSONResponse({}, status_code=404)
+
+ base_url = str(request.base_url).rstrip("/")
+ download_url = f"{base_url}/kobo/{auth_token}/download/{book_id}/kepub"
+ path = Path(target["kepub_path"])
+ title = path.stem.replace(".kepub", "")
+ file_size = path.stat().st_size if path.exists() else 0
+ now = _utc_now()
+
+ # 关键:返回必须是 [metadata] 裸 JSON 数组(长度 1),
+ # 不是裸 dict 也不是 {"BookMetadata": ...} 包装对象。
+ #
+ # calibre-web cps/kobo.py:336-351 HandleMetadataRequest 原文:
+ #
+ # @kobo.route("/v1/library/<book_uuid>/metadata")
+ # @requires_kobo_auth
+ # @download_required
+ # def HandleMetadataRequest(book_uuid):
+ # ...
+ # metadata = get_metadata(book)
+ # response = make_response(json.dumps([metadata], ensure_ascii=False))
+ # response.headers["Content-Type"] = "application/json; charset=utf-8"
+ # return response
+ #
+ # 注意 json.dumps([metadata], ...) 把单个 metadata 包成长度 1 的数组。
+ # 如果返回裸 dict,Kobo parser 会把它当成失败响应反复重试 metadata
+ # 端点(实测日志里同一 book_id 的 metadata 被连续 GET 两次以上),
+ # 永远不会进入 download 阶段。
+ #
+ # Content-Type 也必须是 'application/json; charset=utf-8'(带 charset
+ # 后缀),跟 calibre-web 一致。FastAPI 的 JSONResponse 默认是
+ # 'application/json' 不带 charset,所以这里改用底层 Response 手动
+ # dump + 设 media_type。
+ metadata = {
+ "Categories": ["00000000-0000-0000-0000-000000000001"],
+ "CoverImageId": book_id,
+ "CrossRevisionId": book_id,
+ "CurrentDisplayPrice": {"CurrencyCode": "USD", "TotalAmount": 0},
+ "CurrentLoveDisplayPrice": {"TotalAmount": 0},
+ "Description": "",
+ "DownloadUrls": [{
+ "Format": "EPUB3FL",
+ "Size": file_size,
+ "Url": download_url,
+ "Platform": "Generic",
+ }],
+ "EntitlementId": book_id,
+ "ExternalIds": [],
+ "Genre": "00000000-0000-0000-0000-000000000001",
+ "IsEligibleForKoboLove": False,
+ "IsInternetArchive": False,
+ "IsPreOrder": False,
+ "IsSocialEnabled": True,
+ "Language": "zh",
+ "PhoneticPronunciations": {},
+ "PublicationDate": now[:10] + "T00:00:00Z",
+ "Publisher": {"Imprint": "", "Name": "kobo-manga"},
+ "RevisionId": book_id,
+ "Title": title,
+ "WorkId": book_id,
+ "ContributorRoles": [{"Name": "kobo-manga"}],
+ "Contributors": ["kobo-manga"],
+ }
+ return Response(
+ content=json_mod.dumps([metadata], ensure_ascii=False),
+ media_type="application/json; charset=utf-8",
+ )
+
+
+# ── Download Keys/Link(Kobo 下载管理器使用)──────────
+
+
+@router.post("/v1/library/downloadkeys")
+async def get_download_keys(
+ request: Request,
+ auth_token: str,
+ db: Database = Depends(_get_db),
+):
+ """返回下载密钥。Kobo 下载管理器在下载前可能调用此端点。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ body = await request.json()
+ logger.info("downloadkeys request: %s", body)
+
+ # 返回空密钥列表(无 DRM)
+ return JSONResponse({"Items": []})
+
+
+@router.get("/v1/library/downloadlink")
+@router.post("/v1/library/downloadlink")
+async def get_download_link(
+ request: Request,
+ auth_token: str,
+ db: Database = Depends(_get_db),
+):
+ """返回实际下载链接。Kobo 下载管理器使用此端点获取文件 URL。"""
+ device = _validate_token(auth_token, db)
+ if not device:
+ return JSONResponse({"error": "Invalid token"}, status_code=401)
+
+ base_url = str(request.base_url).rstrip("/")
+
+ # 尝试从 query params 或 body 获取 book_id
+ book_id = request.query_params.get("BookId", "")
+ if not book_id:
+ try:
+ body = await request.json()
+ book_id = body.get("BookId", "")
+ logger.info("downloadlink request body: %s", body)
+ except Exception:
+ pass
+
+ logger.info("downloadlink request for book: %s", book_id)
+
+ if book_id:
+ download_url = f"{base_url}/kobo/{auth_token}/download/{book_id}/kepub"
+ return JSONResponse({
+ "Urls": [{
+ "Format": "KEPUB",
+ "Url": download_url,
+ "Platform": "Generic",
+ }]
+ })
+
+ # 返回所有待同步书的下载链接
+ pending = get_pending_syncs(db, device["device_id"])
+ urls = []
+ for item in pending:
+ urls.append({
+ "BookId": item["book_id"],
+ "Urls": [{
+ "Format": "KEPUB",
+ "Url": f"{base_url}/kobo/{auth_token}/download/{item['book_id']}/kepub",
+ "Platform": "Generic",
+ }]
+ })
+ return JSONResponse({"Items": urls})
+
+
+# ── Catch-all(调试用,捕获所有未匹配的请求)──────────
+
+
+@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
+async def catch_all(auth_token: str, path: str, request: Request):
+ """捕获所有未匹配的 Kobo 请求(调试用)。"""
+ logger.warning("UNHANDLED: %s %s", request.method, request.url)
+ return JSONResponse({})