Files
Rain-Bus 743d69efd0 refactor: extract admin business logic to services, introduce job queue, add derived index helpers
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations)
- Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job)
- Add services/derived.py with FTS5 reindex and paper index deletion helpers
- Refactor scheduler to use job queue instead of direct pipeline calls
- Add heartbeat_at/expires_at to TaskLock for lock health tracking
- Remove DESIGN_REVIEW.md
- Update tests: remove redundant integration tests, add unit tests for new services
2026-06-13 18:31:43 +08:00

141 lines
4.3 KiB
Python

"""派生数据维护 — FTS5 / ChromaDB 等可重建索引。"""
from __future__ import annotations
import logging
from sqlalchemy import select, text
from sqlalchemy.orm import Session
from app.models import Paper
logger = logging.getLogger(__name__)
def _summary_text(paper: Paper) -> str:
summary = paper.summary
if not summary:
return ""
parts = [
summary.one_line,
summary.motivation_problem,
summary.motivation_goal,
summary.method_overview,
summary.method_key_idea,
summary.results_main_json,
]
return " ".join(p for p in parts if p)
def delete_fts_paper(db: Session, paper_id: int) -> None:
"""删除单篇论文的 FTS5 行。FTS5 以 papers.id 作为 rowid。"""
db.execute(
text("DELETE FROM papers_fts WHERE rowid = :paper_id"),
{"paper_id": paper_id},
)
def delete_paper_indexes(db: Session, *, paper_id: int, arxiv_id: str) -> None:
"""删除单篇论文的所有派生索引。失败项记录日志但不阻断主删除。"""
try:
delete_fts_paper(db, paper_id)
except Exception:
logger.warning("Failed to clean FTS index for %s", arxiv_id, exc_info=True)
try:
from app.services.embedder import delete_paper
delete_paper(arxiv_id)
except Exception:
logger.warning("Failed to clean ChromaDB index for %s", arxiv_id, exc_info=True)
def reindex_paper_fts(db: Session, paper: Paper) -> None:
"""按 DB 权威数据重建单篇论文的 FTS5 派生索引。"""
authors_text = ", ".join(
a.name for a in sorted(paper.authors, key=lambda a: a.position or 0)
)
tags_text = ", ".join(t.tag for t in paper.tags)
delete_fts_paper(db, paper.id)
db.execute(
text(
"""
INSERT INTO papers_fts(
rowid, title_en, title_zh, abstract, authors, tags, summary_text
)
VALUES (
:id, :title_en, :title_zh, :abstract, :authors, :tags, :summary_text
)
"""
),
{
"id": paper.id,
"title_en": paper.title_en or "",
"title_zh": paper.title_zh or "",
"abstract": paper.abstract or "",
"authors": authors_text,
"tags": tags_text,
"summary_text": _summary_text(paper),
},
)
def reindex_fts(db: Session, paper_ids: list[int] | None = None) -> dict:
"""全量或局部重建 FTS5 索引。"""
query = select(Paper)
if paper_ids:
query = query.where(Paper.id.in_(paper_ids))
papers = db.execute(query).scalars().all()
if paper_ids is None:
db.execute(text("DELETE FROM papers_fts"))
count = 0
for paper in papers:
reindex_paper_fts(db, paper)
count += 1
db.commit()
logger.info("FTS reindexed: %d papers", count)
return {"status": "success", "indexed": count}
def reindex_chroma(db: Session) -> dict:
"""按 DB 权威数据重建 ChromaDB 语义索引。"""
from app.services.embedder import index_paper
papers = db.execute(select(Paper).where(Paper.summary.has())).scalars().all()
indexed = 0
errors: list[str] = []
for paper in papers:
try:
texts_dict = {
"arxiv_id": paper.arxiv_id,
"title_zh": paper.title_zh or "",
"title_en": paper.title_en or "",
"tags": " ".join(t.tag for t in paper.tags),
"one_line": paper.summary.one_line if paper.summary else "",
"motivation_problem": (
paper.summary.motivation_problem if paper.summary else ""
),
"method_key_idea": (
paper.summary.method_key_idea if paper.summary else ""
),
"paper_date": paper.paper_date.isoformat() if paper.paper_date else "",
}
index_paper(paper.arxiv_id, texts_dict)
indexed += 1
except Exception as exc:
errors.append(f"{paper.arxiv_id}: {exc}")
logger.warning(
"Failed to reindex ChromaDB for %s",
paper.arxiv_id,
exc_info=True,
)
return {
"status": "success" if not errors else "partial",
"indexed": indexed,
"errors": errors or None,
}