Files
Rain-Bus 743d69efd0 refactor: extract admin business logic to services, introduce job queue, add derived index helpers
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations)
- Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job)
- Add services/derived.py with FTS5 reindex and paper index deletion helpers
- Refactor scheduler to use job queue instead of direct pipeline calls
- Add heartbeat_at/expires_at to TaskLock for lock health tracking
- Remove DESIGN_REVIEW.md
- Update tests: remove redundant integration tests, add unit tests for new services
2026-06-13 18:31:43 +08:00

122 lines
3.4 KiB
Python

"""数据库引擎、会话工厂、初始化。"""
from sqlalchemy import event, create_engine, text
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from app.config import settings
class Base(DeclarativeBase):
pass
# ── FTS5 和索引 DDL(与 ORM 模型分开管理)───────────────────────────────
FTS5_CREATE_SQL = """
CREATE VIRTUAL TABLE IF NOT EXISTS papers_fts USING fts5(
title_en,
title_zh,
abstract,
authors,
tags,
summary_text,
tokenize='unicode61'
);
"""
FTS5_TRIGGER_INDEX = """
-- partial index for task_locks running
CREATE UNIQUE INDEX IF NOT EXISTS uq_task_locks_running
ON task_locks(task, lock_key) WHERE status = 'running';
"""
def _make_engine():
"""创建 SQLite 引擎,启用 foreign_keys。"""
engine = create_engine(
settings.DATABASE_URL,
echo=settings.APP_DEBUG,
connect_args={"check_same_thread": False},
)
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_connection, _connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.execute("PRAGMA journal_mode=WAL")
cursor.close()
return engine
engine = _make_engine()
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
def get_db():
"""FastAPI 依赖注入:获取数据库会话。"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def _migrate(engine) -> None:
"""自动给已有表补齐缺失的列(SQLite ALTER TABLE ADD COLUMN)。"""
import logging
logger = logging.getLogger(__name__)
# 定义需要确保存在的列:{表名: [(列名, 列类型 SQL), ...]}
_MIGRATIONS: dict[str, list[tuple[str, str]]] = {
"paper_summaries": [
("figures_json", "TEXT"),
],
"crawl_logs": [
("details_json", "TEXT"),
],
"task_locks": [
("heartbeat_at", "DATETIME"),
("expires_at", "DATETIME"),
],
"jobs": [
("heartbeat_at", "DATETIME"),
],
}
with engine.connect() as conn:
for table, columns in _MIGRATIONS.items():
table_exists = conn.execute(
text(
"SELECT name FROM sqlite_master "
"WHERE type IN ('table', 'virtual table') AND name = :name"
),
{"name": table},
).fetchone()
if not table_exists:
continue
# 获取已有列名
existing = {
row[1] for row in conn.execute(text(f"PRAGMA table_info({table})"))
}
for col_name, col_type in columns:
if col_name not in existing:
conn.execute(
text(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_type}")
)
logger.info("Migrated: %s.%s added", table, col_name)
conn.commit()
def init_db(engine):
"""创建所有 ORM 表 + FTS5 虚拟表 + 自动迁移。"""
from app.models import Base # noqa: F811 — 避免循环导入,延迟导入
Base.metadata.create_all(engine)
with engine.connect() as conn:
conn.execute(text(FTS5_CREATE_SQL))
conn.execute(text(FTS5_TRIGGER_INDEX))
conn.commit()
_migrate(engine)