"""用户数据服务 — 收藏、阅读状态、个人笔记、阅读列表查询。无账号体系,数据写入本地 SQLite。""" from __future__ import annotations from datetime import datetime, timezone from sqlalchemy import or_ from sqlalchemy.orm import Session, joinedload from app.models import Paper, PaperTag, UserBookmark, UserNote, UserReadingStatus # ── 收藏 ────────────────────────────────────────────────────────────── def toggle_bookmark(db: Session, arxiv_id: str) -> dict: """切换收藏状态。返回 {"bookmarked": bool, "arxiv_id": str}。""" paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first() if not paper: return {"error": "not_found"} existing = db.query(UserBookmark).filter(UserBookmark.paper_id == paper.id).first() if existing: db.delete(existing) db.commit() return {"bookmarked": False, "arxiv_id": arxiv_id} else: bookmark = UserBookmark( paper_id=paper.id, created_at=datetime.now(timezone.utc), ) db.add(bookmark) db.commit() return {"bookmarked": True, "arxiv_id": arxiv_id} # ── 阅读状态 ────────────────────────────────────────────────────────── VALID_STATUSES = {"unread", "skimmed", "read_summary", "read_full"} def set_reading_status(db: Session, arxiv_id: str, status: str) -> dict: """设置阅读状态。status 必须是 unread/skimmed/read_summary/read_full。""" if status not in VALID_STATUSES: return {"error": "invalid_status", "valid": sorted(VALID_STATUSES)} paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first() if not paper: return {"error": "not_found"} now = datetime.now(timezone.utc) existing = ( db.query(UserReadingStatus) .filter(UserReadingStatus.paper_id == paper.id) .first() ) if existing: existing.status = status existing.updated_at = now else: db.add( UserReadingStatus( paper_id=paper.id, status=status, updated_at=now, ) ) db.commit() return {"arxiv_id": arxiv_id, "status": status} # ── 笔记 ────────────────────────────────────────────────────────────── def get_note(db: Session, arxiv_id: str) -> dict | None: """获取笔记。返回 {"arxiv_id", "content", "updated_at"} 或 None(论文不存在时)。""" paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first() if not paper: return None note = db.query(UserNote).filter(UserNote.paper_id == paper.id).first() if not note: return {"arxiv_id": arxiv_id, "content": "", "updated_at": None} return { "arxiv_id": arxiv_id, "content": note.content, "updated_at": note.updated_at.isoformat() if note.updated_at else None, } def save_note(db: Session, arxiv_id: str, content: str) -> dict: """创建或更新笔记。返回 {"arxiv_id", "content", "updated_at"}。""" paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first() if not paper: return {"error": "not_found"} now = datetime.now(timezone.utc) existing = db.query(UserNote).filter(UserNote.paper_id == paper.id).first() if existing: existing.content = content existing.updated_at = now else: db.add( UserNote( paper_id=paper.id, content=content, created_at=now, updated_at=now, ) ) db.commit() return { "arxiv_id": arxiv_id, "content": content, "updated_at": now.isoformat(), } # ── 阅读列表 ────────────────────────────────────────────────────────── def query_reading_list( db: Session, filter_type: str, tag: str | None, ) -> list[Paper]: """根据筛选条件查询阅读列表。""" # 基础:有任意用户数据的论文 base = db.query(Paper).filter( or_( Paper.bookmark.has(), Paper.reading_status.has(), Paper.note.has(), ) ) # 应用筛选 if filter_type == "has_note": base = base.filter(Paper.note.has()) elif filter_type in ("unread", "skimmed", "read_summary", "read_full"): base = base.filter( Paper.reading_status.has(UserReadingStatus.status == filter_type) ) # 应用标签 if tag: base = base.filter(Paper.tags.any(PaperTag.tag == tag)) return ( base.options( joinedload(Paper.authors), joinedload(Paper.tags), joinedload(Paper.summary_status), joinedload(Paper.bookmark), joinedload(Paper.reading_status), joinedload(Paper.note), ) .order_by(Paper.paper_date.desc(), Paper.upvotes.desc()) .all() )