Files
daily-paper/app/services/pipeline.py
T
Rain-Bus 743d69efd0 refactor: extract admin business logic to services, introduce job queue, add derived index helpers
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations)
- Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job)
- Add services/derived.py with FTS5 reindex and paper index deletion helpers
- Refactor scheduler to use job queue instead of direct pipeline calls
- Add heartbeat_at/expires_at to TaskLock for lock health tracking
- Remove DESIGN_REVIEW.md
- Update tests: remove redundant integration tests, add unit tests for new services
2026-06-13 18:31:43 +08:00

164 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""流水线服务 — crawl → summarize → cleanup 的共享编排逻辑。
供 admin 手动触发和 scheduler 定时调度共用。
"""
from __future__ import annotations
import logging
from datetime import date as date_type
from datetime import timedelta
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.config import settings
from app.models import CrawlLog, TaskLock
from app.services.cleaner import cleanup_tmp
from app.services.crawler import crawl_daily
from app.services.summarizer import summarize_batch
from app.utils import release_lock, truncate_error, utc_now, yesterday_str
logger = logging.getLogger(__name__)
def acquire_lock(db: Session, task: str, lock_key: str, owner: str) -> TaskLock:
"""获取 TaskLock,锁冲突时抛出 RuntimeError。
供需要防重入的操作(crawl、pipeline 等)统一调用。
"""
lock = TaskLock(
task=task,
lock_key=lock_key,
status="running",
owner=owner,
acquired_at=utc_now(),
heartbeat_at=utc_now(),
expires_at=utc_now() + timedelta(hours=6),
)
try:
db.add(lock)
db.commit()
except IntegrityError:
db.rollback()
raise RuntimeError(f"{task} already running for {lock_key}")
return lock
async def run_crawl(
db: Session,
target_date: str,
owner: str = "admin_crawl",
top_n: int | None = None,
) -> dict:
"""执行单次抓取(带防重入锁)。
Args:
db: 数据库 session
target_date: 目标日期 YYYY-MM-DD
owner: 调用者标识
Returns:
crawl_daily() 的原始返回值
"""
lock = acquire_lock(db, "crawl", target_date, owner)
try:
return await crawl_daily(db, target_date, top_n=top_n)
finally:
release_lock(db, lock)
async def run_pipeline(db: Session, target_date: str, owner: str) -> dict:
"""执行完整流水线:crawl → summarize → cleanup。
使用 task_locks 防重入,写入 CrawlLog 记录。
Args:
db: 数据库 session
target_date: 目标日期 YYYY-MM-DD
owner: 调用者标识(如 "admin_trigger" / "daily_pipeline"
Returns:
{"status": "success"|"failed", "error": str|None, ...}
"""
now = utc_now()
lock_key = f"pipeline-{target_date}"
# ── 获取锁 ──────────────────────────────────────────────────────────
lock = TaskLock(
task="scheduler",
lock_key=lock_key,
status="running",
owner=owner,
acquired_at=now,
heartbeat_at=now,
expires_at=now + timedelta(hours=6),
)
try:
db.add(lock)
db.commit()
except IntegrityError:
db.rollback()
raise RuntimeError(f"Pipeline already running for {target_date}")
# ── 写调度日志 ──────────────────────────────────────────────────────
log_entry = CrawlLog(
task="scheduler",
status="running",
date=date_type.fromisoformat(target_date),
started_at=now,
)
db.add(log_entry)
db.commit()
error_msg = None
crawl_result: dict = {}
try:
# Step 1: 抓取(先试今天,无数据则回退昨天)
crawl_result = await crawl_daily(db, target_date)
logger.info(
"Pipeline [%s]: crawl %s, found=%d new=%d",
owner,
target_date,
crawl_result.get("found", 0),
crawl_result.get("new", 0),
)
if crawl_result.get("status") == "success" and crawl_result.get("found") == 0:
yesterday = yesterday_str()
logger.info("Pipeline [%s]: falling back to %s", owner, yesterday)
crawl_result = await crawl_daily(db, yesterday)
# Step 2: 总结
summarize_result = await summarize_batch(db, pdf_mode=settings.SUMMARY_PDF_MODE)
logger.info("Pipeline [%s]: summarize done, result=%s", owner, summarize_result)
# Step 3: 清理
cleanup_result = cleanup_tmp()
logger.info(
"Pipeline [%s]: cleanup done, removed=%d",
owner,
cleanup_result.get("removed", 0),
)
log_entry.status = "success"
log_entry.papers_found = crawl_result.get("found", 0)
log_entry.papers_new = crawl_result.get("new", 0)
except Exception as exc:
logger.exception("Pipeline [%s] failed", owner)
log_entry.status = "failed"
error_msg = truncate_error(exc, limit=2000)
finally:
log_entry.completed_at = utc_now()
if error_msg:
log_entry.error = error_msg
db.commit()
release_lock(db, lock)
if error_msg:
return {"status": "failed", "error": error_msg}
return {"status": "success", "message": "Pipeline completed"}