21f16e6756
- Split summarizer into summary_generator and summary_persister modules - Refactor pdf_image_extractor to two-phase pipeline with PicoDet layout detection - Add layout_detector service for PicoDet-S_layout_3cls integration - Add exceptions module with ConflictError and NotFoundError - Improve admin dashboard with better statistics and task management - Add design review document with system optimization suggestions - Add new tests for crawler, pdf_downloader, pipeline, and summary_utils - Update dependencies and configuration - Clean up dead code and improve error handling
239 lines
6.7 KiB
Python
239 lines
6.7 KiB
Python
"""清理和删除服务 — 临时文件清理、按日期范围删除论文。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import shutil
|
||
from datetime import date
|
||
|
||
from sqlalchemy import select, text
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.models import (
|
||
CrawlLog,
|
||
DataDeleteJob,
|
||
Paper,
|
||
)
|
||
from app.utils import PAPERS_DIR, TMP_DIR, utc_now
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 临时文件最大保留时间(小时)
|
||
_MAX_TMP_AGE_HOURS = 24
|
||
|
||
|
||
# ── 临时文件清理 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
def cleanup_tmp(max_age_hours: int = _MAX_TMP_AGE_HOURS) -> dict:
|
||
"""扫描 data/tmp/ 删除超过指定时间的临时文件。
|
||
|
||
Args:
|
||
max_age_hours: 文件最大保留时间(小时),默认 24。
|
||
|
||
Returns:
|
||
清理统计 {"scanned": int, "removed": int, "errors": list[str]}
|
||
"""
|
||
if not TMP_DIR.exists():
|
||
return {"scanned": 0, "removed": 0, "errors": []}
|
||
|
||
now = utc_now()
|
||
cutoff = now.timestamp() - (max_age_hours * 3600)
|
||
scanned = 0
|
||
removed = 0
|
||
errors: list[str] = []
|
||
|
||
for entry in TMP_DIR.iterdir():
|
||
if not entry.is_dir():
|
||
continue
|
||
scanned += 1
|
||
try:
|
||
# 取目录的修改时间作为判断依据
|
||
dir_mtime = entry.stat().st_mtime
|
||
if dir_mtime < cutoff:
|
||
shutil.rmtree(entry)
|
||
removed += 1
|
||
logger.info("Cleaned tmp dir: %s", entry.name)
|
||
except Exception as exc:
|
||
err_msg = f"{entry.name}: {exc}"
|
||
errors.append(err_msg)
|
||
logger.warning("Failed to clean tmp dir %s: %s", entry.name, exc)
|
||
|
||
logger.info(
|
||
"Tmp cleanup: scanned=%d removed=%d errors=%d", scanned, removed, len(errors)
|
||
)
|
||
return {"scanned": scanned, "removed": removed, "errors": errors}
|
||
|
||
|
||
# ── 按日期范围删除 ─────────────────────────────────────────────────────────
|
||
|
||
|
||
async def delete_papers_by_date_range(
|
||
db: Session,
|
||
date_start: date,
|
||
date_end: date,
|
||
*,
|
||
include_notes: bool = True,
|
||
) -> dict:
|
||
"""删除 paper_date 落在 [date_start, date_end] 范围内的所有论文。
|
||
|
||
删除流程(每篇独立 try/except):
|
||
1. 查询目标论文
|
||
2. 删除 FTS5 索引
|
||
3. 删除本地文件 data/papers/{arxiv_id}/ 和 data/tmp/{arxiv_id}/
|
||
4. ORM cascade 自动删除关联表(authors, tags, summary, summary_status, bookmarks, reading_status, notes)
|
||
5. 物理删除 papers 记录
|
||
6. 结果写入 data_delete_jobs 表
|
||
|
||
Args:
|
||
db: 数据库 session
|
||
date_start: 起始日期(含)
|
||
date_end: 结束日期(含)
|
||
include_notes: 是否同时删除用户笔记(目前 cascade 自动处理)
|
||
|
||
Returns:
|
||
删除结果统计
|
||
"""
|
||
now = utc_now()
|
||
|
||
# 查询目标论文
|
||
papers = (
|
||
db.execute(
|
||
select(Paper).where(
|
||
Paper.paper_date >= date_start,
|
||
Paper.paper_date <= date_end,
|
||
)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
|
||
total = len(papers)
|
||
logger.info(
|
||
"Delete papers by date range: %s ~ %s, found %d papers",
|
||
date_start,
|
||
date_end,
|
||
total,
|
||
)
|
||
|
||
# 创建 delete job 记录
|
||
job = DataDeleteJob(
|
||
date_start=date_start,
|
||
date_end=date_end,
|
||
include_notes=include_notes,
|
||
paper_count=total,
|
||
status="running",
|
||
started_at=now,
|
||
)
|
||
db.add(job)
|
||
db.commit()
|
||
|
||
deleted = 0
|
||
failed_items: list[dict] = []
|
||
|
||
for paper in papers:
|
||
arxiv_id = paper.arxiv_id
|
||
paper_id = paper.id
|
||
try:
|
||
# 1. 删除 FTS5 索引
|
||
db.execute(
|
||
text("DELETE FROM papers_fts WHERE rowid = :paper_id"),
|
||
{"paper_id": paper_id},
|
||
)
|
||
|
||
# 1.5 从 ChromaDB 删除语义索引
|
||
try:
|
||
from app.services.embedder import delete_paper
|
||
|
||
delete_paper(arxiv_id)
|
||
except Exception:
|
||
logger.warning(
|
||
"Failed to delete %s from ChromaDB", arxiv_id, exc_info=True
|
||
)
|
||
|
||
# 2. 删除本地文件 data/papers/{arxiv_id}/
|
||
paper_dir = PAPERS_DIR / arxiv_id
|
||
if paper_dir.exists():
|
||
shutil.rmtree(paper_dir)
|
||
logger.debug("Removed paper dir: %s", paper_dir)
|
||
|
||
# 3. 删除临时文件 data/tmp/{arxiv_id}/
|
||
tmp_dir = TMP_DIR / arxiv_id
|
||
if tmp_dir.exists():
|
||
shutil.rmtree(tmp_dir)
|
||
logger.debug("Removed tmp dir: %s", tmp_dir)
|
||
|
||
# 4. ORM cascade 删除(authors, tags, summary, summary_status, bookmark, reading_status, note)
|
||
db.delete(paper)
|
||
db.flush()
|
||
|
||
deleted += 1
|
||
logger.debug("Deleted paper: %s", arxiv_id)
|
||
|
||
except Exception as exc:
|
||
db.rollback()
|
||
failed_items.append({"arxiv_id": arxiv_id, "error": str(exc)})
|
||
logger.error("Failed to delete paper %s: %s", arxiv_id, exc)
|
||
|
||
# 提交所有成功的删除
|
||
try:
|
||
db.commit()
|
||
except Exception as exc:
|
||
db.rollback()
|
||
logger.error("Failed to commit delete batch: %s", exc)
|
||
|
||
# 更新 job 状态
|
||
job_error = None
|
||
job_status = "success"
|
||
if failed_items:
|
||
job_status = "failed" if deleted == 0 else "success"
|
||
job_error = "; ".join(
|
||
f"{f['arxiv_id']}: {f['error']}" for f in failed_items[:20]
|
||
)
|
||
|
||
job.status = job_status
|
||
job.paper_count = deleted
|
||
job.completed_at = utc_now()
|
||
if job_error:
|
||
job.error = job_error[:4000]
|
||
db.commit()
|
||
|
||
# 写入 crawl_logs
|
||
log_entry = CrawlLog(
|
||
task="delete",
|
||
status=job_status,
|
||
started_at=now,
|
||
completed_at=utc_now(),
|
||
papers_found=total,
|
||
papers_new=deleted,
|
||
details_json=json.dumps(
|
||
{
|
||
"total_before": total,
|
||
"deleted": deleted,
|
||
"failed": len(failed_items),
|
||
},
|
||
ensure_ascii=False,
|
||
),
|
||
error=job_error,
|
||
)
|
||
db.add(log_entry)
|
||
db.commit()
|
||
|
||
result = {
|
||
"total": total,
|
||
"deleted": deleted,
|
||
"failed": len(failed_items),
|
||
"failed_items": failed_items,
|
||
"status": job_status,
|
||
}
|
||
logger.info(
|
||
"Delete job completed: date_range=%s~%s total=%d deleted=%d failed=%d",
|
||
date_start,
|
||
date_end,
|
||
total,
|
||
deleted,
|
||
len(failed_items),
|
||
)
|
||
return result
|