"""PDF 下载与源码下载 — 从 arXiv 下载论文 PDF 和 LaTeX 源码包。""" from __future__ import annotations import logging import shutil import zipfile from pathlib import Path from app.utils import PAPERS_DIR, TMP_DIR, make_http_client logger = logging.getLogger(__name__) # ── 自定义异常 ────────────────────────────────────────────────────────── class PdfDownloadError(Exception): pass # ── 路径工具 ──────────────────────────────────────────────────────────── def paper_dir(arxiv_id: str) -> Path: return PAPERS_DIR / arxiv_id def tmp_dir(arxiv_id: str) -> Path: return TMP_DIR / arxiv_id # ── PDF 下载 ──────────────────────────────────────────────────────────── async def download_pdf(arxiv_id: str, pdf_url: str) -> Path: """下载 PDF 到 data/tmp/{arxiv_id}/paper.pdf。""" if not pdf_url: raise PdfDownloadError(f"no pdf_url for {arxiv_id}") dest_dir = tmp_dir(arxiv_id) dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / "paper.pdf" try: async with make_http_client(follow_redirects=True) as client: resp = await client.get(pdf_url) resp.raise_for_status() dest.write_bytes(resp.content) except Exception as exc: raise PdfDownloadError(f"failed to download PDF for {arxiv_id}: {exc}") from exc logger.info("Downloaded PDF: %s (%d bytes)", arxiv_id, dest.stat().st_size) return dest # ── 源码下载 ──────────────────────────────────────────────────────────── async def download_source_zip(arxiv_id: str, source_url: str, dest_dir: Path) -> None: """下载 arXiv 源码并解压。""" dest_dir.mkdir(parents=True, exist_ok=True) zip_path = tmp_dir(arxiv_id) / "source.zip" try: async with make_http_client(follow_redirects=True) as client: resp = await client.get(source_url) resp.raise_for_status() zip_path.write_bytes(resp.content) except Exception as exc: logger.debug("Failed to download source for %s: %s", arxiv_id, exc) return try: with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(dest_dir) logger.debug("Extracted source for %s", arxiv_id) except zipfile.BadZipFile: # 可能是 tar.gz import tarfile try: with tarfile.open(zip_path, "r:*") as tf: tf.extractall(dest_dir, filter="data") logger.debug("Extracted source (tar) for %s", arxiv_id) except Exception: logger.warning("Cannot extract source for %s", arxiv_id) except Exception: logger.warning("Cannot extract source for %s", arxiv_id, exc_info=True) finally: if zip_path.exists(): zip_path.unlink() # ── 临时文件清理 ──────────────────────────────────────────────────────── def cleanup_tmp(arxiv_id: str) -> None: """清理 data/tmp/{arxiv_id}/ 目录。""" td = tmp_dir(arxiv_id) if td.exists(): try: shutil.rmtree(td) logger.debug("Cleaned tmp: %s", arxiv_id) except Exception: logger.warning("Failed to clean tmp for %s", arxiv_id, exc_info=True)