"""流水线编排测试 — run_pipeline (crawl → summarize → cleanup)。""" from __future__ import annotations from unittest.mock import AsyncMock, patch import pytest from app.models import TaskLock from app.services.pipeline import run_pipeline from app.utils import utc_now class TestRunPipeline: @pytest.mark.asyncio async def test_full_pipeline_success(self, db_session): with ( patch( "app.services.pipeline.crawl_daily", new_callable=AsyncMock ) as mock_crawl, patch( "app.services.pipeline.summarize_batch", new_callable=AsyncMock ) as mock_summ, patch("app.services.pipeline.cleanup_tmp") as mock_clean, ): mock_crawl.return_value = {"status": "success", "found": 5, "new": 2} mock_summ.return_value = {"status": "success", "done": 2, "failed": 0} mock_clean.return_value = {"removed": 0} result = await run_pipeline(db_session, "2024-01-15", "test") assert result["status"] == "success" mock_crawl.assert_called_once() mock_summ.assert_called_once() mock_clean.assert_called_once() @pytest.mark.asyncio async def test_pipeline_lock_prevents_reentry(self, db_session): """已有 running 锁时抛出 RuntimeError。""" now = utc_now() db_session.add( TaskLock( task="scheduler", lock_key="pipeline-2024-01-15", status="running", owner="other", acquired_at=now, ) ) db_session.commit() with pytest.raises(RuntimeError, match="already running"): await run_pipeline(db_session, "2024-01-15", "test") @pytest.mark.asyncio async def test_crawl_failure_still_runs_summarize_and_cleanup(self, db_session): """crawl 失败时 pipeline 继续执行 summarize 和 cleanup。""" with ( patch( "app.services.pipeline.crawl_daily", new_callable=AsyncMock ) as mock_crawl, patch( "app.services.pipeline.summarize_batch", new_callable=AsyncMock ) as mock_summ, patch("app.services.pipeline.cleanup_tmp") as mock_clean, ): mock_crawl.side_effect = ConnectionError("timeout") mock_summ.return_value = {"status": "success", "done": 0} mock_clean.return_value = {"removed": 0} result = await run_pipeline(db_session, "2024-01-15", "test") # pipeline 捕获异常,返回 failed assert result["status"] == "failed" assert "timeout" in result["error"] # summarize 和 cleanup 不会被调用(exception 跳出 try 块) mock_summ.assert_not_called()