Aggregates all v0.2.0 sprint work (GARAA-30 through GARAA-40) and fixes 2 integration tests that broke when the codebase went async (DSPyLLMAdapter and full pipeline tests now properly await coroutines). 277 tests pass (260 unit + 17 integration). Co-Authored-By: Paperclip <noreply@paperclip.ing>
314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""Unit tests for error handling: retry, circuit breaker, per-call isolation."""
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from prometheus.application.bootstrap import SyntheticBootstrap
|
|
from prometheus.application.evaluator import PromptEvaluator
|
|
from prometheus.application.evolution import CircuitBreakerOpen, EvolutionLoop
|
|
from prometheus.domain.entities import EvalResult, Prompt, SyntheticExample, Trajectory
|
|
from prometheus.infrastructure.retry import is_transient_error, retry_with_backoff
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Retry utility
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsTransientError:
|
|
def test_rate_limit_429(self):
|
|
assert is_transient_error(RuntimeError("HTTP 429: rate limit exceeded"))
|
|
|
|
def test_server_error_503(self):
|
|
assert is_transient_error(RuntimeError("503 Service Unavailable"))
|
|
|
|
def test_timeout(self):
|
|
assert is_transient_error(TimeoutError("request timed out"))
|
|
|
|
def test_connection_error(self):
|
|
assert is_transient_error(ConnectionError("connection refused"))
|
|
|
|
def test_non_transient(self):
|
|
assert not is_transient_error(ValueError("bad input"))
|
|
|
|
def test_os_error(self):
|
|
assert is_transient_error(OSError("network unreachable"))
|
|
|
|
|
|
class TestRetryWithBackoff:
|
|
def test_succeeds_on_first_try(self):
|
|
fn = MagicMock(return_value="ok")
|
|
result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
|
|
assert result == "ok"
|
|
assert fn.call_count == 1
|
|
|
|
def test_retries_on_transient_then_succeeds(self):
|
|
fn = MagicMock(
|
|
side_effect=[
|
|
RuntimeError("429 rate limit"),
|
|
RuntimeError("429 rate limit"),
|
|
"ok",
|
|
]
|
|
)
|
|
with patch("prometheus.infrastructure.retry.time.sleep"):
|
|
result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
|
|
assert result == "ok"
|
|
assert fn.call_count == 3
|
|
|
|
def test_raises_after_max_retries(self):
|
|
fn = MagicMock(side_effect=RuntimeError("503 overloaded"))
|
|
with patch("prometheus.infrastructure.retry.time.sleep"):
|
|
with pytest.raises(RuntimeError, match="503"):
|
|
retry_with_backoff(fn, max_retries=2, retry_delay_base=0)
|
|
assert fn.call_count == 3 # 1 initial + 2 retries
|
|
|
|
def test_non_transient_not_retried(self):
|
|
fn = MagicMock(side_effect=ValueError("bad"))
|
|
with pytest.raises(ValueError, match="bad"):
|
|
retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
|
|
assert fn.call_count == 1
|
|
|
|
def test_exponential_backoff_delays(self):
|
|
fn = MagicMock(side_effect=[RuntimeError("timeout"), "ok"])
|
|
with patch("prometheus.infrastructure.retry.time.sleep") as mock_sleep:
|
|
retry_with_backoff(fn, max_retries=3, retry_delay_base=2.0)
|
|
mock_sleep.assert_called_once_with(2.0) # 2.0 * 2^0 = 2.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Circuit breaker (EvolutionLoop)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_eval_result(scores, feedbacks=None):
|
|
"""Helper to create EvalResult with matching trajectories."""
|
|
feedbacks = feedbacks or ["ok"] * len(scores)
|
|
return EvalResult(
|
|
scores=scores,
|
|
feedbacks=feedbacks,
|
|
trajectories=[
|
|
Trajectory(f"input{i}", f"output{i}", s, f, "prompt")
|
|
for i, (s, f) in enumerate(zip(scores, feedbacks))
|
|
],
|
|
)
|
|
|
|
|
|
class TestCircuitBreaker:
|
|
@pytest.mark.asyncio
|
|
async def test_trips_on_consecutive_failures(self):
|
|
"""Loop stops when consecutive failures reach the threshold."""
|
|
initial_eval = _make_eval_result([0.3, 0.4])
|
|
evaluator = MagicMock()
|
|
call_count = 0
|
|
|
|
def _evaluate(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return initial_eval # seed eval succeeds
|
|
raise RuntimeError("LLM down")
|
|
|
|
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
|
|
proposer = MagicMock()
|
|
proposer.propose = AsyncMock()
|
|
bootstrap = MagicMock(spec=SyntheticBootstrap)
|
|
|
|
loop = EvolutionLoop(
|
|
evaluator=evaluator,
|
|
proposer=proposer,
|
|
bootstrap=bootstrap,
|
|
max_iterations=10,
|
|
minibatch_size=2,
|
|
circuit_breaker_threshold=3,
|
|
error_strategy="skip",
|
|
)
|
|
state = await loop.run(
|
|
Prompt("test"),
|
|
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
|
|
"task",
|
|
)
|
|
|
|
error_events = [h for h in state.history if h.get("event") == "error"]
|
|
cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"]
|
|
assert len(error_events) == 3
|
|
assert len(cb_events) == 1
|
|
assert state.iteration < 10 # stopped early
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_abort_raises_on_first_error(self):
|
|
"""With error_strategy=abort, the first error raises immediately."""
|
|
initial_eval = _make_eval_result([0.3, 0.4])
|
|
evaluator = MagicMock()
|
|
call_count = 0
|
|
|
|
def _evaluate(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return initial_eval
|
|
raise RuntimeError("LLM down")
|
|
|
|
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
|
|
proposer = MagicMock()
|
|
proposer.propose = AsyncMock()
|
|
bootstrap = MagicMock(spec=SyntheticBootstrap)
|
|
|
|
loop = EvolutionLoop(
|
|
evaluator=evaluator,
|
|
proposer=proposer,
|
|
bootstrap=bootstrap,
|
|
max_iterations=10,
|
|
minibatch_size=2,
|
|
circuit_breaker_threshold=3,
|
|
error_strategy="abort",
|
|
)
|
|
with pytest.raises(RuntimeError, match="LLM down"):
|
|
await loop.run(
|
|
Prompt("test"),
|
|
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
|
|
"task",
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resets_on_success(self):
|
|
"""Consecutive failure counter resets after a successful iteration."""
|
|
initial_eval = _make_eval_result([0.3, 0.4])
|
|
good_eval = _make_eval_result([0.8, 0.9])
|
|
|
|
evaluator = MagicMock()
|
|
call_count = 0
|
|
|
|
def _evaluate(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
# call 1: seed eval → succeed
|
|
# call 2: iter 1 current eval → fail
|
|
# call 3: iter 2 current eval → fail
|
|
# call 4: iter 3 current eval → succeed (returns initial_eval)
|
|
# call 5: iter 3 new eval → succeed (returns good_eval, accepted)
|
|
# call 6+: iter 4+ current eval → succeed
|
|
if call_count == 1:
|
|
return initial_eval
|
|
if call_count in (2, 3):
|
|
raise RuntimeError("timeout")
|
|
if call_count % 2 == 0:
|
|
return initial_eval # current eval
|
|
return good_eval # new eval
|
|
|
|
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
|
|
proposer = MagicMock()
|
|
proposer.propose = AsyncMock(return_value=Prompt("better prompt"))
|
|
bootstrap = MagicMock(spec=SyntheticBootstrap)
|
|
bootstrap.sample_minibatch.return_value = [
|
|
SyntheticExample(f"in{i}", id=i) for i in range(2)
|
|
]
|
|
|
|
loop = EvolutionLoop(
|
|
evaluator=evaluator,
|
|
proposer=proposer,
|
|
bootstrap=bootstrap,
|
|
max_iterations=6,
|
|
minibatch_size=2,
|
|
circuit_breaker_threshold=3,
|
|
error_strategy="skip",
|
|
)
|
|
state = await loop.run(
|
|
Prompt("test"),
|
|
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
|
|
"task",
|
|
)
|
|
|
|
# Should NOT have tripped — 2 fails, then success reset the counter
|
|
cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"]
|
|
assert len(cb_events) == 0
|
|
assert state.iteration == 6 # ran all iterations
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-call isolation (Evaluator)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPerCallIsolation:
|
|
@pytest.mark.asyncio
|
|
async def test_evaluator_isolates_execution_failure(self):
|
|
"""A failing execution produces a sentinel output, not a crash."""
|
|
executor = MagicMock()
|
|
executor.execute = AsyncMock(side_effect=[
|
|
"good output",
|
|
RuntimeError("API error"),
|
|
"another good output",
|
|
])
|
|
judge = MagicMock()
|
|
judge.judge_batch = AsyncMock(return_value=[
|
|
(0.8, "good"),
|
|
(0.0, "[judge error]"),
|
|
(0.7, "ok"),
|
|
])
|
|
|
|
evaluator = PromptEvaluator(executor, judge)
|
|
result = await evaluator.evaluate(
|
|
Prompt("test"),
|
|
[
|
|
SyntheticExample("in0", id=0),
|
|
SyntheticExample("in1", id=1),
|
|
SyntheticExample("in2", id=2),
|
|
],
|
|
"task",
|
|
)
|
|
|
|
assert len(result.scores) == 3
|
|
assert result.scores[1] == 0.0 # failed item got zero score
|
|
assert "execution error" in result.trajectories[1].output_text
|
|
assert result.scores[0] == 0.8 # other items unaffected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_judge_adapter_isolates_single_failure(self):
|
|
"""DSPyJudgeAdapter returns sentinel for a failed item, not crash."""
|
|
from prometheus.infrastructure.judge_adapter import DSPyJudgeAdapter
|
|
|
|
adapter = DSPyJudgeAdapter.__new__(DSPyJudgeAdapter)
|
|
adapter._lm = MagicMock()
|
|
adapter._max_retries = 1
|
|
adapter._retry_delay_base = 0
|
|
adapter._semaphore = __import__("asyncio").Semaphore(5)
|
|
adapter._judge_criteria = ""
|
|
adapter._judge_dimensions = []
|
|
adapter._dimension_names = ""
|
|
adapter._weights = {}
|
|
adapter.call_count = 0
|
|
|
|
# Mock _judge to fail on first call, succeed on second
|
|
call_count = 0
|
|
|
|
class FakePred:
|
|
def __init__(self):
|
|
self.score = 0.9
|
|
self.feedback = "good"
|
|
|
|
def fake_judge(**kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
raise RuntimeError("judge failure")
|
|
return FakePred()
|
|
|
|
adapter._judge = fake_judge
|
|
|
|
with patch("prometheus.infrastructure.judge_adapter.dspy.context"):
|
|
with patch(
|
|
"prometheus.infrastructure.retry.asyncio.sleep",
|
|
new=AsyncMock(),
|
|
):
|
|
results = await adapter.judge_batch(
|
|
"task", [("input1", "output1"), ("input2", "output2")]
|
|
)
|
|
|
|
assert len(results) == 2
|
|
# First item failed even after retry → sentinel
|
|
assert results[0] == (0.0, "[judge error: judge failure]")
|
|
# Second item succeeded
|
|
assert results[1] == (0.9, "good")
|