743d69efd0
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations) - Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job) - Add services/derived.py with FTS5 reindex and paper index deletion helpers - Refactor scheduler to use job queue instead of direct pipeline calls - Add heartbeat_at/expires_at to TaskLock for lock health tracking - Remove DESIGN_REVIEW.md - Update tests: remove redundant integration tests, add unit tests for new services
141 lines
4.3 KiB
Python
141 lines
4.3 KiB
Python
"""派生数据维护 — FTS5 / ChromaDB 等可重建索引。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from sqlalchemy import select, text
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models import Paper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _summary_text(paper: Paper) -> str:
|
|
summary = paper.summary
|
|
if not summary:
|
|
return ""
|
|
parts = [
|
|
summary.one_line,
|
|
summary.motivation_problem,
|
|
summary.motivation_goal,
|
|
summary.method_overview,
|
|
summary.method_key_idea,
|
|
summary.results_main_json,
|
|
]
|
|
return " ".join(p for p in parts if p)
|
|
|
|
|
|
def delete_fts_paper(db: Session, paper_id: int) -> None:
|
|
"""删除单篇论文的 FTS5 行。FTS5 以 papers.id 作为 rowid。"""
|
|
db.execute(
|
|
text("DELETE FROM papers_fts WHERE rowid = :paper_id"),
|
|
{"paper_id": paper_id},
|
|
)
|
|
|
|
|
|
def delete_paper_indexes(db: Session, *, paper_id: int, arxiv_id: str) -> None:
|
|
"""删除单篇论文的所有派生索引。失败项记录日志但不阻断主删除。"""
|
|
try:
|
|
delete_fts_paper(db, paper_id)
|
|
except Exception:
|
|
logger.warning("Failed to clean FTS index for %s", arxiv_id, exc_info=True)
|
|
|
|
try:
|
|
from app.services.embedder import delete_paper
|
|
|
|
delete_paper(arxiv_id)
|
|
except Exception:
|
|
logger.warning("Failed to clean ChromaDB index for %s", arxiv_id, exc_info=True)
|
|
|
|
|
|
def reindex_paper_fts(db: Session, paper: Paper) -> None:
|
|
"""按 DB 权威数据重建单篇论文的 FTS5 派生索引。"""
|
|
authors_text = ", ".join(
|
|
a.name for a in sorted(paper.authors, key=lambda a: a.position or 0)
|
|
)
|
|
tags_text = ", ".join(t.tag for t in paper.tags)
|
|
|
|
delete_fts_paper(db, paper.id)
|
|
db.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO papers_fts(
|
|
rowid, title_en, title_zh, abstract, authors, tags, summary_text
|
|
)
|
|
VALUES (
|
|
:id, :title_en, :title_zh, :abstract, :authors, :tags, :summary_text
|
|
)
|
|
"""
|
|
),
|
|
{
|
|
"id": paper.id,
|
|
"title_en": paper.title_en or "",
|
|
"title_zh": paper.title_zh or "",
|
|
"abstract": paper.abstract or "",
|
|
"authors": authors_text,
|
|
"tags": tags_text,
|
|
"summary_text": _summary_text(paper),
|
|
},
|
|
)
|
|
|
|
|
|
def reindex_fts(db: Session, paper_ids: list[int] | None = None) -> dict:
|
|
"""全量或局部重建 FTS5 索引。"""
|
|
query = select(Paper)
|
|
if paper_ids:
|
|
query = query.where(Paper.id.in_(paper_ids))
|
|
papers = db.execute(query).scalars().all()
|
|
|
|
if paper_ids is None:
|
|
db.execute(text("DELETE FROM papers_fts"))
|
|
|
|
count = 0
|
|
for paper in papers:
|
|
reindex_paper_fts(db, paper)
|
|
count += 1
|
|
db.commit()
|
|
logger.info("FTS reindexed: %d papers", count)
|
|
return {"status": "success", "indexed": count}
|
|
|
|
|
|
def reindex_chroma(db: Session) -> dict:
|
|
"""按 DB 权威数据重建 ChromaDB 语义索引。"""
|
|
from app.services.embedder import index_paper
|
|
|
|
papers = db.execute(select(Paper).where(Paper.summary.has())).scalars().all()
|
|
indexed = 0
|
|
errors: list[str] = []
|
|
for paper in papers:
|
|
try:
|
|
texts_dict = {
|
|
"arxiv_id": paper.arxiv_id,
|
|
"title_zh": paper.title_zh or "",
|
|
"title_en": paper.title_en or "",
|
|
"tags": " ".join(t.tag for t in paper.tags),
|
|
"one_line": paper.summary.one_line if paper.summary else "",
|
|
"motivation_problem": (
|
|
paper.summary.motivation_problem if paper.summary else ""
|
|
),
|
|
"method_key_idea": (
|
|
paper.summary.method_key_idea if paper.summary else ""
|
|
),
|
|
"paper_date": paper.paper_date.isoformat() if paper.paper_date else "",
|
|
}
|
|
index_paper(paper.arxiv_id, texts_dict)
|
|
indexed += 1
|
|
except Exception as exc:
|
|
errors.append(f"{paper.arxiv_id}: {exc}")
|
|
logger.warning(
|
|
"Failed to reindex ChromaDB for %s",
|
|
paper.arxiv_id,
|
|
exc_info=True,
|
|
)
|
|
|
|
return {
|
|
"status": "success" if not errors else "partial",
|
|
"indexed": indexed,
|
|
"errors": errors or None,
|
|
}
|