Files
Rain-Bus 743d69efd0 refactor: extract admin business logic to services, introduce job queue, add derived index helpers
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations)
- Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job)
- Add services/derived.py with FTS5 reindex and paper index deletion helpers
- Refactor scheduler to use job queue instead of direct pipeline calls
- Add heartbeat_at/expires_at to TaskLock for lock health tracking
- Remove DESIGN_REVIEW.md
- Update tests: remove redundant integration tests, add unit tests for new services
2026-06-13 18:31:43 +08:00

346 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""SQLAlchemy ORM 模型 — papers, authors, tags, summaries, user data, logs, locks。"""
from enum import StrEnum
from sqlalchemy import (
Boolean,
Column,
Date,
DateTime,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
select,
)
from sqlalchemy.orm import joinedload, relationship
from app.database import Base
# ── 枚举 ────────────────────────────────────────────────────────────────
class SummaryState(StrEnum):
"""总结状态枚举 — 对应 summary_status.status 列。"""
PENDING = "pending"
PROCESSING = "processing"
DONE = "done"
FAILED = "failed"
PERMANENT_FAILURE = "permanent_failure"
class JobStatus(StrEnum):
"""后台任务状态枚举 — 对应 jobs.status 列。"""
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
STALE = "stale"
CANCELLED = "cancelled"
class JobEventStatus(StrEnum):
"""任务阶段事件状态枚举 — 对应 job_events.status 列。"""
STARTED = "started"
SUCCESS = "success"
FAILED = "failed"
INFO = "info"
# ── papers ──────────────────────────────────────────────────────────────
class Paper(Base):
__tablename__ = "papers"
id = Column(Integer, primary_key=True, autoincrement=True)
arxiv_id = Column(String, unique=True, nullable=False, index=True)
title_en = Column(String, nullable=False)
title_zh = Column(String)
abstract = Column(Text)
published_at = Column(Date)
paper_date = Column(Date, nullable=False, index=True)
crawled_at = Column(DateTime, nullable=False)
upvotes = Column(Integer, default=0)
hf_url = Column(String)
arxiv_url = Column(String)
pdf_url = Column(String)
summary_path = Column(String)
raw_output_path = Column(String)
summary_quality = Column(String)
authors = relationship(
"PaperAuthor", back_populates="paper", cascade="all, delete-orphan"
)
tags = relationship(
"PaperTag", back_populates="paper", cascade="all, delete-orphan"
)
summary = relationship(
"PaperSummary",
back_populates="paper",
uselist=False,
cascade="all, delete-orphan",
)
summary_status = relationship(
"SummaryStatus",
back_populates="paper",
uselist=False,
cascade="all, delete-orphan",
)
bookmark = relationship(
"UserBookmark",
back_populates="paper",
uselist=False,
cascade="all, delete-orphan",
)
reading_status = relationship(
"UserReadingStatus",
back_populates="paper",
uselist=False,
cascade="all, delete-orphan",
)
note = relationship(
"UserNote", back_populates="paper", uselist=False, cascade="all, delete-orphan"
)
# ── paper_authors ───────────────────────────────────────────────────────
class PaperAuthor(Base):
__tablename__ = "paper_authors"
__table_args__ = (UniqueConstraint("paper_id", "name"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False, index=True
)
name = Column(String, nullable=False)
position = Column(Integer, default=0)
paper = relationship("Paper", back_populates="authors")
# ── paper_tags ──────────────────────────────────────────────────────────
class PaperTag(Base):
__tablename__ = "paper_tags"
__table_args__ = (UniqueConstraint("paper_id", "tag", "source"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False, index=True
)
tag = Column(String, nullable=False)
source = Column(String, default="hf")
paper = relationship("Paper", back_populates="tags")
# ── paper_summaries ─────────────────────────────────────────────────────
class PaperSummary(Base):
__tablename__ = "paper_summaries"
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), primary_key=True
)
one_line = Column(Text)
difficulty = Column(String)
prerequisites_json = Column(Text)
motivation_problem = Column(Text)
motivation_goal = Column(Text)
motivation_gap = Column(Text)
method_overview = Column(Text)
method_key_idea = Column(Text)
method_steps_json = Column(Text)
method_novelty = Column(Text)
results_main_json = Column(Text)
results_benchmarks_json = Column(Text)
limitations_json = Column(Text)
weaknesses_json = Column(Text)
future_work_json = Column(Text)
reproducibility = Column(String)
figures_json = Column(Text)
full_json = Column(Text, nullable=False)
updated_at = Column(DateTime, nullable=False)
paper = relationship("Paper", back_populates="summary")
# ── summary_status ──────────────────────────────────────────────────────
class SummaryStatus(Base):
__tablename__ = "summary_status"
__table_args__ = (UniqueConstraint("paper_id"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False
)
status = Column(String, nullable=False, default="pending", index=True)
quality = Column(String)
error_type = Column(String)
error = Column(Text)
retry_count = Column(Integer, default=0)
raw_output_saved = Column(Boolean, default=False)
started_at = Column(DateTime)
completed_at = Column(DateTime)
paper = relationship("Paper", back_populates="summary_status")
# ── crawl_logs ──────────────────────────────────────────────────────────
class CrawlLog(Base):
__tablename__ = "crawl_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
task = Column(String, nullable=False)
status = Column(String, nullable=False)
date = Column(Date)
papers_found = Column(Integer)
papers_new = Column(Integer)
error = Column(Text)
details_json = Column(Text) # 任务专用元数据 JSON(如 cleanup: {scanned, removed}
started_at = Column(DateTime, nullable=False)
completed_at = Column(DateTime)
# ── task_locks ──────────────────────────────────────────────────────────
class TaskLock(Base):
__tablename__ = "task_locks"
id = Column(Integer, primary_key=True, autoincrement=True)
task = Column(String, nullable=False)
lock_key = Column(String, nullable=False)
status = Column(String, nullable=False)
owner = Column(String)
acquired_at = Column(DateTime, nullable=False)
heartbeat_at = Column(DateTime)
expires_at = Column(DateTime)
released_at = Column(DateTime)
# ── jobs / job_events ──────────────────────────────────────────────────
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, autoincrement=True)
type = Column(String, nullable=False, index=True)
status = Column(String, nullable=False, default=JobStatus.QUEUED, index=True)
owner = Column(String)
payload_json = Column(Text)
result_json = Column(Text)
error = Column(Text)
created_at = Column(DateTime, nullable=False)
started_at = Column(DateTime)
heartbeat_at = Column(DateTime)
completed_at = Column(DateTime)
events = relationship(
"JobEvent", back_populates="job", cascade="all, delete-orphan"
)
class JobEvent(Base):
__tablename__ = "job_events"
id = Column(Integer, primary_key=True, autoincrement=True)
job_id = Column(
Integer, ForeignKey("jobs.id", ondelete="CASCADE"), nullable=False, index=True
)
stage = Column(String, nullable=False)
status = Column(String, nullable=False)
message = Column(Text)
payload_json = Column(Text)
created_at = Column(DateTime, nullable=False)
job = relationship("Job", back_populates="events")
# ── user data ──────────────────────────────────────────────────────────
class UserBookmark(Base):
__tablename__ = "user_bookmarks"
__table_args__ = (UniqueConstraint("paper_id"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False
)
note = Column(Text)
created_at = Column(DateTime, nullable=False)
paper = relationship("Paper", back_populates="bookmark")
class UserReadingStatus(Base):
__tablename__ = "user_reading_status"
__table_args__ = (UniqueConstraint("paper_id"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False
)
status = Column(String, nullable=False, default="unread", index=True)
updated_at = Column(DateTime, nullable=False)
paper = relationship("Paper", back_populates="reading_status")
class UserNote(Base):
__tablename__ = "user_notes"
__table_args__ = (UniqueConstraint("paper_id"),)
id = Column(Integer, primary_key=True, autoincrement=True)
paper_id = Column(
Integer, ForeignKey("papers.id", ondelete="CASCADE"), nullable=False
)
content = Column(Text, nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
paper = relationship("Paper", back_populates="note")
# ── data_delete_jobs ───────────────────────────────────────────────────
class DataDeleteJob(Base):
__tablename__ = "data_delete_jobs"
id = Column(Integer, primary_key=True, autoincrement=True)
date_start = Column(Date, nullable=False)
date_end = Column(Date, nullable=False)
include_notes = Column(Boolean, default=True)
paper_count = Column(Integer, default=0)
status = Column(String, nullable=False)
error = Column(Text)
started_at = Column(DateTime, nullable=False)
completed_at = Column(DateTime)
# ── 常用 joinedload 选项集 ──────────────────────────────────────────────
# 避免在各路由/服务中重复写 .options(joinedload(Paper.authors), ...)
PAPER_DEFAULT_LOAD = (
joinedload(Paper.authors),
joinedload(Paper.tags),
joinedload(Paper.summary_status),
)
PAPER_FULL_LOAD = (
joinedload(Paper.authors),
joinedload(Paper.tags),
joinedload(Paper.summary_status),
joinedload(Paper.bookmark),
joinedload(Paper.reading_status),
)
def get_paper_by_arxiv_id(db, arxiv_id: str, *, load=PAPER_DEFAULT_LOAD):
"""按 arxiv_id 查询论文(带关联加载),未找到返回 None。"""
stmt = select(Paper).where(Paper.arxiv_id == arxiv_id).options(*load)
return db.execute(stmt).unique().scalar_one_or_none()
def get_paper_by_id(db, paper_id: int, *, load=PAPER_DEFAULT_LOAD):
"""按主键查询论文(带关联加载),未找到返回 None。"""
stmt = select(Paper).where(Paper.id == paper_id).options(*load)
return db.execute(stmt).unique().scalar_one_or_none()