"""派生数据维护 — FTS5 / ChromaDB 等可重建索引。""" from __future__ import annotations import logging from sqlalchemy import select, text from sqlalchemy.orm import Session from app.models import Paper logger = logging.getLogger(__name__) def _summary_text(paper: Paper) -> str: summary = paper.summary if not summary: return "" parts = [ summary.one_line, summary.motivation_problem, summary.motivation_goal, summary.method_overview, summary.method_key_idea, summary.results_main_json, ] return " ".join(p for p in parts if p) def delete_fts_paper(db: Session, paper_id: int) -> None: """删除单篇论文的 FTS5 行。FTS5 以 papers.id 作为 rowid。""" db.execute( text("DELETE FROM papers_fts WHERE rowid = :paper_id"), {"paper_id": paper_id}, ) def delete_paper_indexes(db: Session, *, paper_id: int, arxiv_id: str) -> None: """删除单篇论文的所有派生索引。失败项记录日志但不阻断主删除。""" try: delete_fts_paper(db, paper_id) except Exception: logger.warning("Failed to clean FTS index for %s", arxiv_id, exc_info=True) try: from app.services.embedder import delete_paper delete_paper(arxiv_id) except Exception: logger.warning("Failed to clean ChromaDB index for %s", arxiv_id, exc_info=True) def reindex_paper_fts(db: Session, paper: Paper) -> None: """按 DB 权威数据重建单篇论文的 FTS5 派生索引。""" authors_text = ", ".join( a.name for a in sorted(paper.authors, key=lambda a: a.position or 0) ) tags_text = ", ".join(t.tag for t in paper.tags) delete_fts_paper(db, paper.id) db.execute( text( """ INSERT INTO papers_fts( rowid, title_en, title_zh, abstract, authors, tags, summary_text ) VALUES ( :id, :title_en, :title_zh, :abstract, :authors, :tags, :summary_text ) """ ), { "id": paper.id, "title_en": paper.title_en or "", "title_zh": paper.title_zh or "", "abstract": paper.abstract or "", "authors": authors_text, "tags": tags_text, "summary_text": _summary_text(paper), }, ) def reindex_fts(db: Session, paper_ids: list[int] | None = None) -> dict: """全量或局部重建 FTS5 索引。""" query = select(Paper) if paper_ids: query = query.where(Paper.id.in_(paper_ids)) papers = db.execute(query).scalars().all() if paper_ids is None: db.execute(text("DELETE FROM papers_fts")) count = 0 for paper in papers: reindex_paper_fts(db, paper) count += 1 db.commit() logger.info("FTS reindexed: %d papers", count) return {"status": "success", "indexed": count} def reindex_chroma(db: Session) -> dict: """按 DB 权威数据重建 ChromaDB 语义索引。""" from app.services.embedder import index_paper papers = db.execute(select(Paper).where(Paper.summary.has())).scalars().all() indexed = 0 errors: list[str] = [] for paper in papers: try: texts_dict = { "arxiv_id": paper.arxiv_id, "title_zh": paper.title_zh or "", "title_en": paper.title_en or "", "tags": " ".join(t.tag for t in paper.tags), "one_line": paper.summary.one_line if paper.summary else "", "motivation_problem": ( paper.summary.motivation_problem if paper.summary else "" ), "method_key_idea": ( paper.summary.method_key_idea if paper.summary else "" ), "paper_date": paper.paper_date.isoformat() if paper.paper_date else "", } index_paper(paper.arxiv_id, texts_dict) indexed += 1 except Exception as exc: errors.append(f"{paper.arxiv_id}: {exc}") logger.warning( "Failed to reindex ChromaDB for %s", paper.arxiv_id, exc_info=True, ) return { "status": "success" if not errors else "partial", "indexed": indexed, "errors": errors or None, }