Files
Prompt-optimizer/src/prometheus/infrastructure/proposer_adapter.py
FullStackDev a5bf2ad59c feat: v0.2.0 sprint — ground truth eval, crossover/mutation, checkpointing, similarity guards, dataset loader, CLI commands, extended test coverage
Aggregates all v0.2.0 sprint work (GARAA-30 through GARAA-40) and fixes
2 integration tests that broke when the codebase went async (DSPyLLMAdapter
and full pipeline tests now properly await coroutines).

277 tests pass (260 unit + 17 integration).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-03-29 19:13:50 +00:00

77 lines
2.5 KiB
Python

"""
Adapter: Reflective Mutation Proposer.
Implements the ProposerPort via the DSPy InstructionProposer.
Converts trajectories into readable format for the LLM proposer.
"""
from __future__ import annotations
import asyncio
import dspy
from prometheus.domain.entities import Prompt, Trajectory
from prometheus.domain.ports import ProposerPort
from prometheus.infrastructure.dspy_modules import InstructionProposer
from prometheus.infrastructure.retry import async_retry_with_backoff
class DSPyProposerAdapter(ProposerPort):
"""Uses evaluation trajectories to build a failure report and propose a new prompt."""
def __init__(
self,
lm: dspy.LM,
max_retries: int = 3,
retry_delay_base: float = 1.0,
) -> None:
self._lm = lm
self._proposer = InstructionProposer()
self._max_retries = max_retries
self._retry_delay_base = retry_delay_base
self.call_count: int = 0
async def propose(
self,
current_prompt: Prompt,
trajectories: list[Trajectory],
task_description: str,
) -> Prompt:
failure_examples = self._format_failures(trajectories)
async def _call() -> Prompt:
return await asyncio.to_thread(
self._sync_propose, current_prompt, task_description, failure_examples,
)
return await async_retry_with_backoff(
_call,
max_retries=self._max_retries,
retry_delay_base=self._retry_delay_base,
)
def _sync_propose(self, current_prompt: Prompt, task_description: str, failure_examples: str) -> Prompt:
with dspy.context(lm=self._lm):
pred = self._proposer(
current_instruction=current_prompt.text,
task_description=task_description,
failure_examples=failure_examples,
)
self.call_count += 1
return Prompt(text=pred.new_instruction)
@staticmethod
def _format_failures(trajectories: list[Trajectory]) -> str:
"""Convert trajectories into a structured textual report."""
sections: list[str] = []
for i, t in enumerate(trajectories, 1):
section = (
f"# Example {i}\n"
f"## Input\n{t.input_text}\n\n"
f"## Generated Output\n{t.output_text}\n\n"
f"## Score\n{t.score:.2f}\n\n"
f"## Feedback\n{t.feedback}\n"
)
sections.append(section)
return "\n---\n".join(sections)