Files
Prompt-optimizer/tests/unit/test_error_handling.py
FullStackDev b9745566c8 feat: custom judge criteria and multi-dimensional scoring
Add configurable judge rubrics and multi-dimensional scoring with
weighted aggregation. New config fields: judge_criteria (free text)
and judge_dimensions (list of {name, weight, description}). CLI
--judge-criteria flag provides quick overrides. The judge adapter
computes weighted aggregate scores and enriches feedback with
per-dimension breakdowns.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-03-29 15:40:21 +00:00

313 lines
11 KiB
Python

"""Unit tests for error handling: retry, circuit breaker, per-call isolation."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prometheus.application.bootstrap import SyntheticBootstrap
from prometheus.application.evaluator import PromptEvaluator
from prometheus.application.evolution import CircuitBreakerOpen, EvolutionLoop
from prometheus.domain.entities import EvalResult, Prompt, SyntheticExample, Trajectory
from prometheus.infrastructure.retry import is_transient_error, retry_with_backoff
# ---------------------------------------------------------------------------
# Retry utility
# ---------------------------------------------------------------------------
class TestIsTransientError:
def test_rate_limit_429(self):
assert is_transient_error(RuntimeError("HTTP 429: rate limit exceeded"))
def test_server_error_503(self):
assert is_transient_error(RuntimeError("503 Service Unavailable"))
def test_timeout(self):
assert is_transient_error(TimeoutError("request timed out"))
def test_connection_error(self):
assert is_transient_error(ConnectionError("connection refused"))
def test_non_transient(self):
assert not is_transient_error(ValueError("bad input"))
def test_os_error(self):
assert is_transient_error(OSError("network unreachable"))
class TestRetryWithBackoff:
def test_succeeds_on_first_try(self):
fn = MagicMock(return_value="ok")
result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
assert result == "ok"
assert fn.call_count == 1
def test_retries_on_transient_then_succeeds(self):
fn = MagicMock(
side_effect=[
RuntimeError("429 rate limit"),
RuntimeError("429 rate limit"),
"ok",
]
)
with patch("prometheus.infrastructure.retry.time.sleep"):
result = retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
assert result == "ok"
assert fn.call_count == 3
def test_raises_after_max_retries(self):
fn = MagicMock(side_effect=RuntimeError("503 overloaded"))
with patch("prometheus.infrastructure.retry.time.sleep"):
with pytest.raises(RuntimeError, match="503"):
retry_with_backoff(fn, max_retries=2, retry_delay_base=0)
assert fn.call_count == 3 # 1 initial + 2 retries
def test_non_transient_not_retried(self):
fn = MagicMock(side_effect=ValueError("bad"))
with pytest.raises(ValueError, match="bad"):
retry_with_backoff(fn, max_retries=3, retry_delay_base=0)
assert fn.call_count == 1
def test_exponential_backoff_delays(self):
fn = MagicMock(side_effect=[RuntimeError("timeout"), "ok"])
with patch("prometheus.infrastructure.retry.time.sleep") as mock_sleep:
retry_with_backoff(fn, max_retries=3, retry_delay_base=2.0)
mock_sleep.assert_called_once_with(2.0) # 2.0 * 2^0 = 2.0
# ---------------------------------------------------------------------------
# Circuit breaker (EvolutionLoop)
# ---------------------------------------------------------------------------
def _make_eval_result(scores, feedbacks=None):
"""Helper to create EvalResult with matching trajectories."""
feedbacks = feedbacks or ["ok"] * len(scores)
return EvalResult(
scores=scores,
feedbacks=feedbacks,
trajectories=[
Trajectory(f"input{i}", f"output{i}", s, f, "prompt")
for i, (s, f) in enumerate(zip(scores, feedbacks))
],
)
class TestCircuitBreaker:
@pytest.mark.asyncio
async def test_trips_on_consecutive_failures(self):
"""Loop stops when consecutive failures reach the threshold."""
initial_eval = _make_eval_result([0.3, 0.4])
evaluator = MagicMock()
call_count = 0
def _evaluate(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return initial_eval # seed eval succeeds
raise RuntimeError("LLM down")
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose = AsyncMock()
bootstrap = MagicMock(spec=SyntheticBootstrap)
loop = EvolutionLoop(
evaluator=evaluator,
proposer=proposer,
bootstrap=bootstrap,
max_iterations=10,
minibatch_size=2,
circuit_breaker_threshold=3,
error_strategy="skip",
)
state = await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
)
error_events = [h for h in state.history if h.get("event") == "error"]
cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"]
assert len(error_events) == 3
assert len(cb_events) == 1
assert state.iteration < 10 # stopped early
@pytest.mark.asyncio
async def test_abort_raises_on_first_error(self):
"""With error_strategy=abort, the first error raises immediately."""
initial_eval = _make_eval_result([0.3, 0.4])
evaluator = MagicMock()
call_count = 0
def _evaluate(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return initial_eval
raise RuntimeError("LLM down")
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose = AsyncMock()
bootstrap = MagicMock(spec=SyntheticBootstrap)
loop = EvolutionLoop(
evaluator=evaluator,
proposer=proposer,
bootstrap=bootstrap,
max_iterations=10,
minibatch_size=2,
circuit_breaker_threshold=3,
error_strategy="abort",
)
with pytest.raises(RuntimeError, match="LLM down"):
await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
)
@pytest.mark.asyncio
async def test_resets_on_success(self):
"""Consecutive failure counter resets after a successful iteration."""
initial_eval = _make_eval_result([0.3, 0.4])
good_eval = _make_eval_result([0.8, 0.9])
evaluator = MagicMock()
call_count = 0
def _evaluate(*args, **kwargs):
nonlocal call_count
call_count += 1
# call 1: seed eval → succeed
# call 2: iter 1 current eval → fail
# call 3: iter 2 current eval → fail
# call 4: iter 3 current eval → succeed (returns initial_eval)
# call 5: iter 3 new eval → succeed (returns good_eval, accepted)
# call 6+: iter 4+ current eval → succeed
if call_count == 1:
return initial_eval
if call_count in (2, 3):
raise RuntimeError("timeout")
if call_count % 2 == 0:
return initial_eval # current eval
return good_eval # new eval
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose = AsyncMock(return_value=Prompt("better prompt"))
bootstrap = MagicMock(spec=SyntheticBootstrap)
bootstrap.sample_minibatch.return_value = [
SyntheticExample(f"in{i}", id=i) for i in range(2)
]
loop = EvolutionLoop(
evaluator=evaluator,
proposer=proposer,
bootstrap=bootstrap,
max_iterations=6,
minibatch_size=2,
circuit_breaker_threshold=3,
error_strategy="skip",
)
state = await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
)
# Should NOT have tripped — 2 fails, then success reset the counter
cb_events = [h for h in state.history if h.get("event") == "circuit_breaker"]
assert len(cb_events) == 0
assert state.iteration == 6 # ran all iterations
# ---------------------------------------------------------------------------
# Per-call isolation (Evaluator)
# ---------------------------------------------------------------------------
class TestPerCallIsolation:
@pytest.mark.asyncio
async def test_evaluator_isolates_execution_failure(self):
"""A failing execution produces a sentinel output, not a crash."""
executor = MagicMock()
executor.execute = AsyncMock(side_effect=[
"good output",
RuntimeError("API error"),
"another good output",
])
judge = MagicMock()
judge.judge_batch = AsyncMock(return_value=[
(0.8, "good"),
(0.0, "[judge error]"),
(0.7, "ok"),
])
evaluator = PromptEvaluator(executor, judge)
result = await evaluator.evaluate(
Prompt("test"),
[
SyntheticExample("in0", id=0),
SyntheticExample("in1", id=1),
SyntheticExample("in2", id=2),
],
"task",
)
assert len(result.scores) == 3
assert result.scores[1] == 0.0 # failed item got zero score
assert "execution error" in result.trajectories[1].output_text
assert result.scores[0] == 0.8 # other items unaffected
@pytest.mark.asyncio
async def test_judge_adapter_isolates_single_failure(self):
"""DSPyJudgeAdapter returns sentinel for a failed item, not crash."""
from prometheus.infrastructure.judge_adapter import DSPyJudgeAdapter
adapter = DSPyJudgeAdapter.__new__(DSPyJudgeAdapter)
adapter._lm = MagicMock()
adapter._max_retries = 1
adapter._retry_delay_base = 0
adapter._semaphore = __import__("asyncio").Semaphore(5)
adapter._judge_criteria = ""
adapter._judge_dimensions = []
adapter._dimension_names = ""
adapter._weights = {}
# Mock _judge to fail on first call, succeed on second
call_count = 0
class FakePred:
def __init__(self):
self.score = 0.9
self.feedback = "good"
def fake_judge(**kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("judge failure")
return FakePred()
adapter._judge = fake_judge
with patch("prometheus.infrastructure.judge_adapter.dspy.context"):
with patch(
"prometheus.infrastructure.retry.asyncio.sleep",
new=AsyncMock(),
):
results = await adapter.judge_batch(
"task", [("input1", "output1"), ("input2", "output2")]
)
assert len(results) == 2
# First item failed even after retry → sentinel
assert results[0] == (0.0, "[judge error: judge failure]")
# Second item succeeded
assert results[1] == (0.9, "good")