~cytrogen/kobo-manga

ref: 4e504823f4bf8d2b5f4279da3f4d4ebe98fc97ad kobo-manga/src/kobo_manga/pipeline.py -rw-r--r-- 8.7 KiB
4e504823 — HallowDem Initial commit: kobo-manga pipeline a day ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""全流程编排器

串联 搜索 → 下载 → 图片处理 → KEPUB 打包 的完整流水线。
"""

from pathlib import Path

from kobo_manga.config import AppConfig
from kobo_manga.converter.kepub import KepubBuilder
from kobo_manga.db.database import Database
from kobo_manga.db.queries import (
    get_downloaded_chapter_ids,
    get_manga,
    get_manga_chapter_stats,
    update_subscription_checked,
    upsert_chapters,
    upsert_manga,
)
from kobo_manga.downloader.engine import DownloadEngine
from kobo_manga.models import Chapter, DownloadResult, MangaInfo
from kobo_manga.processor.pipeline import ImageProcessor
from kobo_manga.sources import get_source


class MangaPipeline:
    """全流程编排:搜索 → 下载 → 处理 → 打包。"""

    def __init__(
        self,
        config: AppConfig,
        db: Database,
        base_dir: Path = Path("."),
    ):
        self.config = config
        self.db = db
        self.downloads_dir = base_dir / "downloads"
        self.output_dir = base_dir / "output"

    def _source_name(self) -> str:
        """获取当前配置的源名称。"""
        return self.config.sources[0] if self.config.sources else "manhuagui"

    async def search(
        self, keyword: str, source_name: str | None = None
    ) -> list[MangaInfo]:
        """搜索漫画。source_name=None 时搜索所有已配置的源。"""
        if source_name:
            async with get_source(source_name) as source:
                return await source.search(keyword)

        # 多源聚合搜索
        results: list[MangaInfo] = []
        for name in self.config.sources or ["manhuagui"]:
            try:
                async with get_source(name) as source:
                    results.extend(await source.search(keyword))
            except Exception as e:
                print(f"  [!] {name} 搜索失败: {e}")
        return results

    async def get_manga_info(
        self, manga_url: str, source_name: str | None = None
    ) -> MangaInfo:
        """获取漫画详情(含章节列表)。"""
        name = source_name or self._source_name()
        async with get_source(name) as source:
            manga = await source.get_manga_info(manga_url)
            upsert_manga(self.db, manga)
            return manga

    async def download_and_convert(
        self,
        manga_url: str,
        source_name: str | None = None,
        chapter_range: tuple[float, float] | None = None,
        chapter_ids: list[str] | None = None,
        chapter_type: str | None = None,
    ) -> list[Path]:
        """全流程:下载 → 图片处理 → KEPUB 打包。

        Args:
            manga_url: 漫画 URL
            source_name: 源名称,None 则用默认源
            chapter_range: 章节号范围 (start, end),闭区间
            chapter_ids: 指定章节 ID 列表
            chapter_type: 章节类型筛选 (volume/chapter/extra)

        Returns:
            生成的 .kepub.epub 文件路径列表
        """
        name = source_name or self._source_name()
        async with get_source(name) as source:
            engine = DownloadEngine(
                db=self.db,
                source=source,
                config=self.config,
                base_dir=self.downloads_dir,
            )
            result = await engine.download_manga(
                manga_url,
                chapter_range=chapter_range,
                chapter_ids=chapter_ids,
                chapter_type=chapter_type,
            )

        return self._process_results(result)

    def _process_results(self, result: DownloadResult) -> list[Path]:
        """对下载成功的章节执行图片处理和 KEPUB 打包。"""
        kepub_paths = []
        processor = ImageProcessor(
            self.config.processing, self.config.device
        )

        successful = [
            cr for cr in result.chapter_results
            if cr.status == "downloaded" and cr.download_path
        ]

        if not successful:
            return kepub_paths

        manga_output = self.output_dir / _sanitize_filename(result.manga.title)

        for i, cr in enumerate(successful, 1):
            # 检查是否已有 KEPUB(用章节 ID 避免同名冲突)
            kepub_name = (
                f"{_sanitize_filename(result.manga.title)} - "
                f"{cr.chapter.id} {_sanitize_filename(cr.chapter.title)}.kepub.epub"
            )
            kepub_path = manga_output / kepub_name
            if kepub_path.exists():
                print(f"  [SKIP] 已存在: {cr.chapter.title}")
                kepub_paths.append(kepub_path)
                continue

            print(
                f"\n[{i}/{len(successful)}] "
                f"处理+打包: {cr.chapter.title}"
            )

            try:
                path = self._process_and_convert(
                    result.manga, cr.chapter, cr.download_path
                )
                kepub_paths.append(path)
                print(f"  [OK] {path.name}")
            except Exception as e:
                print(f"  [!] 处理失败: {e}")

        return kepub_paths

    def _process_and_convert(
        self,
        manga: MangaInfo,
        chapter: Chapter,
        download_path: Path,
    ) -> Path:
        """处理图片 + 打包 KEPUB(单章节)。"""
        processor = ImageProcessor(
            self.config.processing, self.config.device
        )

        # 收集下载的图片文件
        image_paths = sorted(
            p for p in download_path.iterdir()
            if p.suffix.lower() in (".jpg", ".jpeg", ".png", ".webp")
        )

        if not image_paths:
            raise ValueError(f"目录中无图片文件: {download_path}")

        # 图片处理
        processed_dir = (
            self.downloads_dir.parent
            / "processed"
            / _sanitize_filename(manga.title)
            / _sanitize_filename(chapter.title)
        )
        processed_paths = processor.process_chapter(image_paths, processed_dir)
        print(f"  处理: {len(image_paths)} -> {len(processed_paths)} 张")

        # KEPUB 打包
        output_dir = self.output_dir / _sanitize_filename(manga.title)
        builder = KepubBuilder(manga, chapter, self.config.device)
        kepub_path = builder.build(processed_paths, output_dir)
        print(f"  打包: {kepub_path.name}")

        return kepub_path

    async def check_and_download_updates(
        self, manga_id: str, source_name: str
    ) -> list[Path]:
        """检查订阅漫画的新章节并下载转换。

        Returns:
            新生成的 .kepub.epub 路径列表
        """
        # 1. 从 DB 获取漫画记录
        manga = get_manga(self.db, manga_id, source_name)
        if not manga:
            print(f"  [!] 漫画不存在: {manga_id} ({source_name})")
            return []

        # 2. 拉取最新章节列表
        async with get_source(source_name) as source:
            fresh_manga = await source.get_manga_info(manga.url)

        # 3. 更新 DB
        upsert_manga(self.db, fresh_manga)
        upsert_chapters(
            self.db, fresh_manga.id, fresh_manga.source, fresh_manga.chapters
        )

        # 4. 找出新章节(未下载的)
        downloaded_ids = get_downloaded_chapter_ids(
            self.db, fresh_manga.id, fresh_manga.source
        )
        new_chapters = [
            ch for ch in fresh_manga.chapters
            if ch.id not in downloaded_ids
        ]

        if not new_chapters:
            print(f"  [OK] {fresh_manga.title}: 无新章节")
            update_subscription_checked(self.db, manga_id, source_name)
            return []

        print(f"  发现 {len(new_chapters)} 个新章节")
        new_chapter_ids = [ch.id for ch in new_chapters]

        # 5. 下载新章节
        async with get_source(source_name) as source:
            engine = DownloadEngine(
                db=self.db,
                source=source,
                config=self.config,
                base_dir=self.downloads_dir,
            )
            result = await engine.download_manga(
                fresh_manga.url,
                chapter_ids=new_chapter_ids,
            )

        # 6. 处理+打包
        kepub_paths = self._process_results(result)

        # 7. 更新订阅状态
        last_ch_id = new_chapters[-1].id if new_chapters else None
        update_subscription_checked(
            self.db, manga_id, source_name, last_chapter_id=last_ch_id
        )

        return kepub_paths

    def get_stats(self, manga: MangaInfo) -> dict:
        """获取漫画的章节统计。"""
        return get_manga_chapter_stats(self.db, manga.id, manga.source)


def _sanitize_filename(name: str) -> str:
    """清理文件名中的非法字符。"""
    return "".join(
        c if c.isalnum() or c in " _-()()【】" else "_" for c in name
    )