"""后台 Job 服务测试。""" from __future__ import annotations from datetime import timedelta from unittest.mock import patch import pytest from sqlalchemy import select from app.models import Job, JobEvent, JobStatus, TaskLock from app.services.jobs import create_job, recover_stale_jobs, run_job from app.utils import utc_now class TestJobs: def test_create_job_writes_event(self, db_session): job = create_job( db_session, "cleanup_tmp", owner="test", payload={"reason": "unit-test"}, ) assert job.id is not None assert job.status == JobStatus.QUEUED events = ( db_session.execute(select(JobEvent).where(JobEvent.job_id == job.id)) .scalars() .all() ) assert len(events) == 1 assert events[0].stage == "created" @pytest.mark.asyncio async def test_run_job_success(self, db_session): job = create_job(db_session, "cleanup_tmp", owner="test", payload={}) with patch("app.services.cleaner.cleanup_tmp") as mock_cleanup: mock_cleanup.return_value = {"scanned": 1, "removed": 1, "errors": []} result = await run_job(db_session, job.id) refreshed = db_session.get(Job, job.id) assert result["removed"] == 1 assert refreshed.status == JobStatus.SUCCESS assert refreshed.result_json is not None @pytest.mark.asyncio async def test_run_job_failure_records_error(self, db_session): job = create_job(db_session, "missing_job_type", owner="test", payload={}) result = await run_job(db_session, job.id) refreshed = db_session.get(Job, job.id) assert result["status"] == "failed" assert refreshed.status == JobStatus.FAILED assert "Unsupported job type" in refreshed.error @pytest.mark.asyncio async def test_run_job_dispatches_refresh_upvotes(self, db_session): job = create_job( db_session, "refresh_upvotes", owner="test", payload={"days": 3}, ) with patch("app.services.crawler.refresh_upvotes") as mock_refresh: mock_refresh.return_value = {"status": "success", "updated": 2} result = await run_job(db_session, job.id) mock_refresh.assert_awaited_once_with(db_session, days=3) assert result["updated"] == 2 @pytest.mark.asyncio async def test_run_job_dispatches_reindex_fts(self, db_session): job = create_job(db_session, "reindex_fts", owner="test", payload={}) with patch("app.services.derived.reindex_fts") as mock_reindex: mock_reindex.return_value = {"status": "success", "indexed": 5} result = await run_job(db_session, job.id) mock_reindex.assert_called_once_with(db_session) assert result["indexed"] == 5 def test_recover_stale_jobs_and_locks(self, db_session): old = utc_now() - timedelta(hours=7) job = Job( type="cleanup_tmp", status=JobStatus.RUNNING, owner="test", created_at=old, started_at=old, heartbeat_at=old, ) lock = TaskLock( task="cleanup", lock_key="tmp", status="running", owner="test", acquired_at=old, ) db_session.add_all([job, lock]) db_session.commit() recovered = recover_stale_jobs(db_session) assert recovered == 2 assert db_session.get(Job, job.id).status == JobStatus.STALE assert db_session.get(TaskLock, lock.id).status == "stale"