"""流水线服务 — crawl → summarize → cleanup 的共享编排逻辑。 供 admin 手动触发和 scheduler 定时调度共用。 """ from __future__ import annotations import logging from datetime import date as date_type from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from app.config import settings from app.models import CrawlLog, TaskLock from app.services.cleaner import cleanup_tmp from app.services.crawler import crawl_daily from app.services.summarizer import summarize_batch from app.utils import release_lock, truncate_error, utc_now, yesterday_str logger = logging.getLogger(__name__) def acquire_lock(db: Session, task: str, lock_key: str, owner: str) -> TaskLock: """获取 TaskLock,锁冲突时抛出 RuntimeError。 供需要防重入的操作(crawl、pipeline 等)统一调用。 """ lock = TaskLock( task=task, lock_key=lock_key, status="running", owner=owner, acquired_at=utc_now(), ) try: db.add(lock) db.commit() except IntegrityError: db.rollback() raise RuntimeError(f"{task} already running for {lock_key}") return lock async def run_crawl(db: Session, target_date: str, owner: str = "admin_crawl") -> dict: """执行单次抓取(带防重入锁)。 Args: db: 数据库 session target_date: 目标日期 YYYY-MM-DD owner: 调用者标识 Returns: crawl_daily() 的原始返回值 """ lock = acquire_lock(db, "crawl", target_date, owner) try: return await crawl_daily(db, target_date) finally: release_lock(db, lock) async def run_pipeline(db: Session, target_date: str, owner: str) -> dict: """执行完整流水线:crawl → summarize → cleanup。 使用 task_locks 防重入,写入 CrawlLog 记录。 Args: db: 数据库 session target_date: 目标日期 YYYY-MM-DD owner: 调用者标识(如 "admin_trigger" / "daily_pipeline") Returns: {"status": "success"|"failed", "error": str|None, ...} """ now = utc_now() lock_key = f"pipeline-{target_date}" # ── 获取锁 ────────────────────────────────────────────────────────── lock = TaskLock( task="scheduler", lock_key=lock_key, status="running", owner=owner, acquired_at=now, ) try: db.add(lock) db.commit() except IntegrityError: db.rollback() raise RuntimeError(f"Pipeline already running for {target_date}") # ── 写调度日志 ────────────────────────────────────────────────────── log_entry = CrawlLog( task="scheduler", status="running", date=date_type.fromisoformat(target_date), started_at=now, ) db.add(log_entry) db.commit() error_msg = None crawl_result: dict = {} try: # Step 1: 抓取(先试今天,无数据则回退昨天) crawl_result = await crawl_daily(db, target_date) logger.info( "Pipeline [%s]: crawl %s, found=%d new=%d", owner, target_date, crawl_result.get("found", 0), crawl_result.get("new", 0), ) if crawl_result.get("status") == "success" and crawl_result.get("found") == 0: yesterday = yesterday_str() logger.info("Pipeline [%s]: falling back to %s", owner, yesterday) crawl_result = await crawl_daily(db, yesterday) # Step 2: 总结 summarize_result = await summarize_batch(db, pdf_mode=settings.SUMMARY_PDF_MODE) logger.info("Pipeline [%s]: summarize done, result=%s", owner, summarize_result) # Step 3: 清理 cleanup_result = cleanup_tmp() logger.info( "Pipeline [%s]: cleanup done, removed=%d", owner, cleanup_result.get("removed", 0), ) log_entry.status = "success" log_entry.papers_found = crawl_result.get("found", 0) log_entry.papers_new = crawl_result.get("new", 0) except Exception as exc: logger.exception("Pipeline [%s] failed", owner) log_entry.status = "failed" error_msg = truncate_error(exc, limit=2000) finally: log_entry.completed_at = utc_now() if error_msg: log_entry.error = error_msg db.commit() release_lock(db, lock) if error_msg: return {"status": "failed", "error": error_msg} return {"status": "success", "message": "Pipeline completed"}