Files
Rain-Bus 743d69efd0 refactor: extract admin business logic to services, introduce job queue, add derived index helpers
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations)
- Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job)
- Add services/derived.py with FTS5 reindex and paper index deletion helpers
- Refactor scheduler to use job queue instead of direct pipeline calls
- Add heartbeat_at/expires_at to TaskLock for lock health tracking
- Remove DESIGN_REVIEW.md
- Update tests: remove redundant integration tests, add unit tests for new services
2026-06-13 18:31:43 +08:00

144 lines
4.0 KiB
Python

"""调度服务 — APScheduler 每日自动抓取、总结、清理流水线。"""
from __future__ import annotations
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.orm import Session
from zoneinfo import ZoneInfo
from app.config import settings
from app.database import SessionLocal
from app.services.jobs import create_job, run_job
from app.utils import today_str
logger = logging.getLogger(__name__)
# 模块级 scheduler 实例,保证单例
_scheduler: AsyncIOScheduler | None = None
def get_scheduler() -> AsyncIOScheduler | None:
"""返回当前 scheduler 实例(供测试和外部检查用)。"""
return _scheduler
def start_scheduler() -> AsyncIOScheduler | None:
"""创建并启动 APScheduler。
约束:
- SCHEDULER_ENABLED=true 才启动。
- APP_WORKERS > 1 时只打印警告(多 worker 下调度器可能重复触发)。
- 使用 task_locks 表防重入。
- 调度时间按 APP_TIMEZONE 时区。
"""
global _scheduler
if not settings.SCHEDULER_ENABLED:
logger.info("Scheduler disabled (SCHEDULER_ENABLED=false)")
return None
if settings.APP_WORKERS > 1:
logger.warning(
"⚠️ APP_WORKERS=%d > 1, scheduler may trigger duplicate tasks. "
"Set APP_WORKERS=1 or SCHEDULER_ENABLED=false.",
settings.APP_WORKERS,
)
tz = ZoneInfo(settings.APP_TIMEZONE)
scheduler = AsyncIOScheduler(timezone=tz)
# 每日流水线:抓取 → 总结 → 清理
trigger = CronTrigger(
hour=settings.SCHEDULE_HOUR,
minute=settings.SCHEDULE_MINUTE,
timezone=tz,
)
scheduler.add_job(
_daily_pipeline,
trigger=trigger,
id="daily_pipeline",
name="daily_pipeline",
replace_existing=True,
max_instances=1,
misfire_grace_time=3600, # 允许迟到 1 小时内补执行
)
# upvote 刷新:每天流水线之后 30 分钟执行,刷新最近 7 天论文的投票数
upvote_trigger = CronTrigger(
hour=settings.SCHEDULE_HOUR,
minute=settings.SCHEDULE_MINUTE + 30,
timezone=tz,
)
scheduler.add_job(
_upvote_refresh,
trigger=upvote_trigger,
id="upvote_refresh",
name="upvote_refresh",
replace_existing=True,
max_instances=1,
misfire_grace_time=3600,
)
scheduler.start()
_scheduler = scheduler
logger.info(
"Scheduler started: %02d:%02d %s",
settings.SCHEDULE_HOUR,
settings.SCHEDULE_MINUTE,
settings.APP_TIMEZONE,
)
return scheduler
def stop_scheduler() -> None:
"""停止调度器。"""
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("Scheduler stopped")
async def _daily_pipeline() -> None:
"""每日流水线:抓取 → 总结 → 清理。
委托给 pipeline.run_pipeline 执行,使用 task_locks 防重入。
"""
today = today_str()
db: Session = SessionLocal()
try:
job = create_job(
db,
"pipeline_daily",
owner="daily_pipeline",
payload={"target_date": today},
)
await run_job(db, job.id)
except RuntimeError:
logger.warning("Daily pipeline already running for %s, skipping", today)
except Exception:
logger.exception("Unexpected error in daily pipeline")
finally:
db.close()
async def _upvote_refresh() -> None:
"""刷新最近 N 天论文的 upvotes。"""
db: Session = SessionLocal()
try:
job = create_job(db, "refresh_upvotes", owner="upvote_refresh", payload={})
result = await run_job(db, job.id)
logger.info(
"Upvote refresh completed: status=%s updated=%d",
result.get("status"),
result.get("updated", 0),
)
except Exception:
logger.exception("Unexpected error in upvote refresh")
finally:
db.close()