feat: async/parallel execution with configurable concurrency

Parallelize LLM calls across minibatches to reduce wall-clock time.
All domain ports (LLMPort, JudgePort, ProposerPort) are now async.
Adapter implementations wrap synchronous DSPy calls with asyncio.to_thread.
Judge calls run in parallel within a batch using asyncio.gather + semaphore.
Evaluator parallelizes minibatch execution with configurable concurrency.
Evolution loop and use case are fully async. Proposer stays sequential.
Added --max-concurrency CLI flag and max_concurrency YAML config field.
Added async_retry_with_backoff for async error handling.
All 139 unit tests pass.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
FullStackDev
2026-03-29 13:15:34 +00:00
parent e2d111ce5b
commit c92ca4a2b8
16 changed files with 297 additions and 159 deletions

View File

@@ -1,7 +1,7 @@
"""Unit tests for error handling: retry, circuit breaker, per-call isolation."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -96,7 +96,8 @@ def _make_eval_result(scores, feedbacks=None):
class TestCircuitBreaker:
def test_trips_on_consecutive_failures(self):
@pytest.mark.asyncio
async def test_trips_on_consecutive_failures(self):
"""Loop stops when consecutive failures reach the threshold."""
initial_eval = _make_eval_result([0.3, 0.4])
evaluator = MagicMock()
@@ -109,8 +110,9 @@ class TestCircuitBreaker:
return initial_eval # seed eval succeeds
raise RuntimeError("LLM down")
evaluator.evaluate.side_effect = _evaluate
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose = AsyncMock()
bootstrap = MagicMock(spec=SyntheticBootstrap)
loop = EvolutionLoop(
@@ -123,7 +125,7 @@ class TestCircuitBreaker:
error_strategy="skip",
)
with patch.object(loop, "_log"):
state = loop.run(
state = await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
@@ -135,7 +137,8 @@ class TestCircuitBreaker:
assert len(cb_events) == 1
assert state.iteration < 10 # stopped early
def test_abort_raises_on_first_error(self):
@pytest.mark.asyncio
async def test_abort_raises_on_first_error(self):
"""With error_strategy=abort, the first error raises immediately."""
initial_eval = _make_eval_result([0.3, 0.4])
evaluator = MagicMock()
@@ -148,8 +151,9 @@ class TestCircuitBreaker:
return initial_eval
raise RuntimeError("LLM down")
evaluator.evaluate.side_effect = _evaluate
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose = AsyncMock()
bootstrap = MagicMock(spec=SyntheticBootstrap)
loop = EvolutionLoop(
@@ -163,13 +167,14 @@ class TestCircuitBreaker:
)
with patch.object(loop, "_log"):
with pytest.raises(RuntimeError, match="LLM down"):
loop.run(
await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
)
def test_resets_on_success(self):
@pytest.mark.asyncio
async def test_resets_on_success(self):
"""Consecutive failure counter resets after a successful iteration."""
initial_eval = _make_eval_result([0.3, 0.4])
good_eval = _make_eval_result([0.8, 0.9])
@@ -194,9 +199,9 @@ class TestCircuitBreaker:
return initial_eval # current eval
return good_eval # new eval
evaluator.evaluate.side_effect = _evaluate
evaluator.evaluate = AsyncMock(side_effect=_evaluate)
proposer = MagicMock()
proposer.propose.return_value = Prompt("better prompt")
proposer.propose = AsyncMock(return_value=Prompt("better prompt"))
bootstrap = MagicMock(spec=SyntheticBootstrap)
bootstrap.sample_minibatch.return_value = [
SyntheticExample(f"in{i}", id=i) for i in range(2)
@@ -212,7 +217,7 @@ class TestCircuitBreaker:
error_strategy="skip",
)
with patch.object(loop, "_log"):
state = loop.run(
state = await loop.run(
Prompt("test"),
[SyntheticExample("in", id=0), SyntheticExample("in2", id=1)],
"task",
@@ -230,23 +235,24 @@ class TestCircuitBreaker:
class TestPerCallIsolation:
def test_evaluator_isolates_execution_failure(self):
@pytest.mark.asyncio
async def test_evaluator_isolates_execution_failure(self):
"""A failing execution produces a sentinel output, not a crash."""
executor = MagicMock()
executor.execute.side_effect = [
executor.execute = AsyncMock(side_effect=[
"good output",
RuntimeError("API error"),
"another good output",
]
])
judge = MagicMock()
judge.judge_batch.return_value = [
judge.judge_batch = AsyncMock(return_value=[
(0.8, "good"),
(0.0, "[judge error]"),
(0.7, "ok"),
]
])
evaluator = PromptEvaluator(executor, judge)
result = evaluator.evaluate(
result = await evaluator.evaluate(
Prompt("test"),
[
SyntheticExample("in0", id=0),
@@ -261,7 +267,8 @@ class TestPerCallIsolation:
assert "execution error" in result.trajectories[1].output_text
assert result.scores[0] == 0.8 # other items unaffected
def test_judge_adapter_isolates_single_failure(self):
@pytest.mark.asyncio
async def test_judge_adapter_isolates_single_failure(self):
"""DSPyJudgeAdapter returns sentinel for a failed item, not crash."""
from prometheus.infrastructure.judge_adapter import DSPyJudgeAdapter
@@ -269,6 +276,7 @@ class TestPerCallIsolation:
adapter._lm = MagicMock()
adapter._max_retries = 1
adapter._retry_delay_base = 0
adapter._semaphore = __import__("asyncio").Semaphore(5)
# Mock _judge to fail on first call, succeed on second
call_count = 0
@@ -289,9 +297,10 @@ class TestPerCallIsolation:
with patch("prometheus.infrastructure.judge_adapter.dspy.context"):
with patch(
"prometheus.infrastructure.retry.time.sleep"
"prometheus.infrastructure.retry.asyncio.sleep",
new=AsyncMock(),
):
results = adapter.judge_batch(
results = await adapter.judge_batch(
"task", [("input1", "output1"), ("input2", "output2")]
)