"""Unit tests for error handling: retry, circuit breaker, per-call isolation.""" from __future__ import annotations from unittest.mock import MagicMock, patch import pytest from prometheus.application.bootstrap import SyntheticBootstrap from prometheus.application.evaluator import PromptEvaluator from prometheus.application.evolution import CircuitBreakerOpen, EvolutionLoop from prometheus.domain.entities import EvalResult, Prompt, SyntheticExample, Trajectory from prometheus.infrastructure.retry import is_transient_error, retry_with_backoff # --------------------------------------------------------------------------- # Retry utility # --------------------------------------------------------------------------- class TestIsTransientError: def test_rate_limit_429(self): assert is_transient_error(RuntimeError("HTTP 429: rate limit exceeded")) def test_server_error_503(self): assert is_transient_error(RuntimeError("503 Service Unavailable")) def test_timeout(self): assert is_transient_error(TimeoutError("request timed out")) def test_connection_error(self): assert is_transient_error(ConnectionError("connection refused")) def test_non_transient(self): assert not is_transient_error(ValueError("bad input")) def test_os_error(self): assert is_transient_error(OSError("network unreachable")) class TestRetryWithBackoff: def test_succeeds_on_first_try(self): fn = MagicMock(return_value="ok") result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0) assert result == "ok" assert fn.call_count == 1 def test_retries_on_transient_then_succeeds(self): fn = MagicMock( side_effect=[ RuntimeError("429 rate limit"), RuntimeError("429 rate limit"), "ok", ] ) with patch("prometheus.infrastructure.retry.time.sleep"): result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0) assert result == "ok" assert fn.call_count == 3 def test_raises_after_max_retries(self): fn = MagicMock(side_effect=RuntimeError("503 overloaded")) with patch("prometheus.infrastructure.retry.time.sleep"): with pytest.raises(RuntimeError, match="503"): retry_with_backoff(fn, max_retries=2, retry_delay_base=0) assert fn.call_count == 3 # 1 initial + 2 retries def test_non_transient_not_retried(self): fn = MagicMock(side_effect=ValueError("bad")) with pytest.raises(ValueError, match="bad"): retry_with_backoff(fn, max_retries=3, retry_delay_base=0) assert fn.call_count == 1 def test_exponential_backoff_delays(self): fn = MagicMock(side_effect=[RuntimeError("timeout"), "ok"]) with patch("prometheus.infrastructure.retry.time.sleep") as mock_sleep: retry_with_backoff(fn, max_retries=3, retry_delay_base=2.0) mock_sleep.assert_called_once_with(2.0) # 2.0 * 2^0 = 2.0 # --------------------------------------------------------------------------- # Circuit breaker (EvolutionLoop) # --------------------------------------------------------------------------- def _make_eval_result(scores, feedbacks=None): """Helper to create EvalResult with matching trajectories.""" feedbacks = feedbacks or ["ok"] * len(scores) return EvalResult( scores=scores, feedbacks=feedbacks, trajectories=[ Trajectory(f"input{i}", f"output{i}", s, f, "prompt") for i, (s, f) in enumerate(zip(scores, feedbacks)) ], ) class TestCircuitBreaker: def test_trips_on_consecutive_failures(self): """Loop stops when consecutive failures reach the threshold.""" initial_eval = _make_eval_result([0.3, 0.4]) evaluator = MagicMock() call_count = 0 def _evaluate(*args, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: return initial_eval # seed eval succeeds raise RuntimeError("LLM down") evaluator.evaluate.side_effect = _evaluate proposer = MagicMock() bootstrap = MagicMock(spec=SyntheticBootstrap) loop = EvolutionLoop( evaluator=evaluator, proposer=proposer, bootstrap=bootstrap, max_iterations=10, minibatch_size=2, circuit_breaker_threshold=3, error_strategy="skip", ) with patch.object(loop, "_log"): state = loop.run( Prompt("test"), [SyntheticExample("in", id=0), SyntheticExample("in2", id=1)], "task", ) error_events = [h for h in state.history if h.get("event") == "error"] cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"] assert len(error_events) == 3 assert len(cb_events) == 1 assert state.iteration < 10 # stopped early def test_abort_raises_on_first_error(self): """With error_strategy=abort, the first error raises immediately.""" initial_eval = _make_eval_result([0.3, 0.4]) evaluator = MagicMock() call_count = 0 def _evaluate(*args, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: return initial_eval raise RuntimeError("LLM down") evaluator.evaluate.side_effect = _evaluate proposer = MagicMock() bootstrap = MagicMock(spec=SyntheticBootstrap) loop = EvolutionLoop( evaluator=evaluator, proposer=proposer, bootstrap=bootstrap, max_iterations=10, minibatch_size=2, circuit_breaker_threshold=3, error_strategy="abort", ) with patch.object(loop, "_log"): with pytest.raises(RuntimeError, match="LLM down"): loop.run( Prompt("test"), [SyntheticExample("in", id=0), SyntheticExample("in2", id=1)], "task", ) def test_resets_on_success(self): """Consecutive failure counter resets after a successful iteration.""" initial_eval = _make_eval_result([0.3, 0.4]) good_eval = _make_eval_result([0.8, 0.9]) evaluator = MagicMock() call_count = 0 def _evaluate(*args, **kwargs): nonlocal call_count call_count += 1 # call 1: seed eval → succeed # call 2: iter 1 current eval → fail # call 3: iter 2 current eval → fail # call 4: iter 3 current eval → succeed (returns initial_eval) # call 5: iter 3 new eval → succeed (returns good_eval, accepted) # call 6+: iter 4+ current eval → succeed if call_count == 1: return initial_eval if call_count in (2, 3): raise RuntimeError("timeout") if call_count % 2 == 0: return initial_eval # current eval return good_eval # new eval evaluator.evaluate.side_effect = _evaluate proposer = MagicMock() proposer.propose.return_value = Prompt("better prompt") bootstrap = MagicMock(spec=SyntheticBootstrap) bootstrap.sample_minibatch.return_value = [ SyntheticExample(f"in{i}", id=i) for i in range(2) ] loop = EvolutionLoop( evaluator=evaluator, proposer=proposer, bootstrap=bootstrap, max_iterations=6, minibatch_size=2, circuit_breaker_threshold=3, error_strategy="skip", ) with patch.object(loop, "_log"): state = loop.run( Prompt("test"), [SyntheticExample("in", id=0), SyntheticExample("in2", id=1)], "task", ) # Should NOT have tripped — 2 fails, then success reset the counter cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"] assert len(cb_events) == 0 assert state.iteration == 6 # ran all iterations # --------------------------------------------------------------------------- # Per-call isolation (Evaluator) # --------------------------------------------------------------------------- class TestPerCallIsolation: def test_evaluator_isolates_execution_failure(self): """A failing execution produces a sentinel output, not a crash.""" executor = MagicMock() executor.execute.side_effect = [ "good output", RuntimeError("API error"), "another good output", ] judge = MagicMock() judge.judge_batch.return_value = [ (0.8, "good"), (0.0, "[judge error]"), (0.7, "ok"), ] evaluator = PromptEvaluator(executor, judge) result = evaluator.evaluate( Prompt("test"), [ SyntheticExample("in0", id=0), SyntheticExample("in1", id=1), SyntheticExample("in2", id=2), ], "task", ) assert len(result.scores) == 3 assert result.scores[1] == 0.0 # failed item got zero score assert "execution error" in result.trajectories[1].output_text assert result.scores[0] == 0.8 # other items unaffected def test_judge_adapter_isolates_single_failure(self): """DSPyJudgeAdapter returns sentinel for a failed item, not crash.""" from prometheus.infrastructure.judge_adapter import DSPyJudgeAdapter adapter = DSPyJudgeAdapter.__new__(DSPyJudgeAdapter) adapter._lm = MagicMock() adapter._max_retries = 1 adapter._retry_delay_base = 0 # Mock _judge to fail on first call, succeed on second call_count = 0 class FakePred: def __init__(self): self.score = 0.9 self.feedback = "good" def fake_judge(**kwargs): nonlocal call_count call_count += 1 if call_count == 1: raise RuntimeError("judge failure") return FakePred() adapter._judge = fake_judge with patch("prometheus.infrastructure.judge_adapter.dspy.context"): with patch( "prometheus.infrastructure.retry.time.sleep" ): results = adapter.judge_batch( "task", [("input1", "output1"), ("input2", "output2")] ) assert len(results) == 2 # First item failed even after retry → sentinel assert results[0] == (0.0, "[judge error: judge failure]") # Second item succeeded assert results[1] == (0.9, "good")