743d69efd0
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations) - Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job) - Add services/derived.py with FTS5 reindex and paper index deletion helpers - Refactor scheduler to use job queue instead of direct pipeline calls - Add heartbeat_at/expires_at to TaskLock for lock health tracking - Remove DESIGN_REVIEW.md - Update tests: remove redundant integration tests, add unit tests for new services
144 lines
4.0 KiB
Python
144 lines
4.0 KiB
Python
"""调度服务 — APScheduler 每日自动抓取、总结、清理流水线。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy.orm import Session
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from app.config import settings
|
|
from app.database import SessionLocal
|
|
from app.services.jobs import create_job, run_job
|
|
from app.utils import today_str
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 模块级 scheduler 实例,保证单例
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler | None:
|
|
"""返回当前 scheduler 实例(供测试和外部检查用)。"""
|
|
return _scheduler
|
|
|
|
|
|
def start_scheduler() -> AsyncIOScheduler | None:
|
|
"""创建并启动 APScheduler。
|
|
|
|
约束:
|
|
- SCHEDULER_ENABLED=true 才启动。
|
|
- APP_WORKERS > 1 时只打印警告(多 worker 下调度器可能重复触发)。
|
|
- 使用 task_locks 表防重入。
|
|
- 调度时间按 APP_TIMEZONE 时区。
|
|
"""
|
|
global _scheduler
|
|
|
|
if not settings.SCHEDULER_ENABLED:
|
|
logger.info("Scheduler disabled (SCHEDULER_ENABLED=false)")
|
|
return None
|
|
|
|
if settings.APP_WORKERS > 1:
|
|
logger.warning(
|
|
"⚠️ APP_WORKERS=%d > 1, scheduler may trigger duplicate tasks. "
|
|
"Set APP_WORKERS=1 or SCHEDULER_ENABLED=false.",
|
|
settings.APP_WORKERS,
|
|
)
|
|
|
|
tz = ZoneInfo(settings.APP_TIMEZONE)
|
|
scheduler = AsyncIOScheduler(timezone=tz)
|
|
|
|
# 每日流水线:抓取 → 总结 → 清理
|
|
trigger = CronTrigger(
|
|
hour=settings.SCHEDULE_HOUR,
|
|
minute=settings.SCHEDULE_MINUTE,
|
|
timezone=tz,
|
|
)
|
|
scheduler.add_job(
|
|
_daily_pipeline,
|
|
trigger=trigger,
|
|
id="daily_pipeline",
|
|
name="daily_pipeline",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
misfire_grace_time=3600, # 允许迟到 1 小时内补执行
|
|
)
|
|
|
|
# upvote 刷新:每天流水线之后 30 分钟执行,刷新最近 7 天论文的投票数
|
|
upvote_trigger = CronTrigger(
|
|
hour=settings.SCHEDULE_HOUR,
|
|
minute=settings.SCHEDULE_MINUTE + 30,
|
|
timezone=tz,
|
|
)
|
|
scheduler.add_job(
|
|
_upvote_refresh,
|
|
trigger=upvote_trigger,
|
|
id="upvote_refresh",
|
|
name="upvote_refresh",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
misfire_grace_time=3600,
|
|
)
|
|
|
|
scheduler.start()
|
|
_scheduler = scheduler
|
|
logger.info(
|
|
"Scheduler started: %02d:%02d %s",
|
|
settings.SCHEDULE_HOUR,
|
|
settings.SCHEDULE_MINUTE,
|
|
settings.APP_TIMEZONE,
|
|
)
|
|
return scheduler
|
|
|
|
|
|
def stop_scheduler() -> None:
|
|
"""停止调度器。"""
|
|
global _scheduler
|
|
if _scheduler:
|
|
_scheduler.shutdown(wait=False)
|
|
_scheduler = None
|
|
logger.info("Scheduler stopped")
|
|
|
|
|
|
async def _daily_pipeline() -> None:
|
|
"""每日流水线:抓取 → 总结 → 清理。
|
|
|
|
委托给 pipeline.run_pipeline 执行,使用 task_locks 防重入。
|
|
"""
|
|
today = today_str()
|
|
|
|
db: Session = SessionLocal()
|
|
try:
|
|
job = create_job(
|
|
db,
|
|
"pipeline_daily",
|
|
owner="daily_pipeline",
|
|
payload={"target_date": today},
|
|
)
|
|
await run_job(db, job.id)
|
|
except RuntimeError:
|
|
logger.warning("Daily pipeline already running for %s, skipping", today)
|
|
except Exception:
|
|
logger.exception("Unexpected error in daily pipeline")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def _upvote_refresh() -> None:
|
|
"""刷新最近 N 天论文的 upvotes。"""
|
|
db: Session = SessionLocal()
|
|
try:
|
|
job = create_job(db, "refresh_upvotes", owner="upvote_refresh", payload={})
|
|
result = await run_job(db, job.id)
|
|
logger.info(
|
|
"Upvote refresh completed: status=%s updated=%d",
|
|
result.get("status"),
|
|
result.get("updated", 0),
|
|
)
|
|
except Exception:
|
|
logger.exception("Unexpected error in upvote refresh")
|
|
finally:
|
|
db.close()
|