aboutsummaryrefslogtreecommitdiff
path: root/benchmark/agbenchmark/challenges/builtin.py
blob: 5906966881f209b6923efeb5912e3ad58d473799 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
from collections import deque
import glob
import json
import logging
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any, ClassVar, Iterator, Literal, Optional

import pytest
from agent_protocol_client import AgentApi, ApiClient, Configuration as ClientConfig
from colorama import Fore, Style
from openai import _load_client as get_openai_client
from pydantic import BaseModel, constr, Field, validator

from agbenchmark.agent_api_interface import download_agent_artifacts_into_folder
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult
from agbenchmark.utils.prompts import (
    END_PROMPT,
    FEW_SHOT_EXAMPLES,
    PROMPT_MAP,
    SCORING_MAP,
)

from .base import BaseChallenge, ChallengeInfo

logger = logging.getLogger(__name__)

with open(Path(__file__).parent / "optional_categories.json") as f:
    OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"]


class BuiltinChallengeSpec(BaseModel):
    eval_id: str = ""
    name: str
    task: str
    category: list[Category]
    dependencies: list[str]
    cutoff: int

    class Info(BaseModel):
        difficulty: DifficultyLevel
        description: constr(regex=r"^Tests if the agent can.*")
        side_effects: list[str] = Field(default_factory=list)

    info: Info

    class Ground(BaseModel):
        answer: str
        should_contain: Optional[list[str]] = None
        should_not_contain: Optional[list[str]] = None
        files: list[str]
        case_sensitive: Optional[bool] = True

        class Eval(BaseModel):
            type: str
            scoring: Optional[Literal["percentage", "scale", "binary"]]
            template: Optional[Literal["rubric", "reference", "question", "custom"]]
            examples: Optional[str]

            @validator("scoring", "template", always=True)
            def validate_eval_fields(cls, v, values, field):
                if "type" in values and values["type"] == "llm":
                    if v is None:
                        raise ValueError(
                            f"{field.name} must be provided when eval type is 'llm'"
                        )
                else:
                    if v is not None:
                        raise ValueError(
                            f"{field.name} should only exist when eval type is 'llm'"
                        )
                return v

        eval: Eval

    ground: Ground

    metadata: Optional[dict[str, Any]] = None
    spec_file: Path | None = Field(None, exclude=True)


class BuiltinChallenge(BaseChallenge):
    """
    Base class for AGBenchmark's built-in challenges (challenges/**/*.json).

    All of the logic is present in this class. Individual challenges are created as
    subclasses of `BuiltinChallenge` with challenge-specific values assigned to the
    ClassVars `_spec` etc.

    Dynamically constructing subclasses rather than class instances for the individual
    challenges makes them suitable for collection by Pytest, which will run their
    `test_method` like any regular test item.
    """

    _spec: ClassVar[BuiltinChallengeSpec]
    CHALLENGE_LOCATION: ClassVar[str]
    ARTIFACTS_LOCATION: ClassVar[str]

    SOURCE_URI_PREFIX = "__BUILTIN__"

    @classmethod
    def from_challenge_spec(
        cls, spec: BuiltinChallengeSpec
    ) -> type["BuiltinChallenge"]:
        if not spec.spec_file:
            raise ValueError("spec.spec_file not defined")

        challenge_info = ChallengeInfo(
            eval_id=spec.eval_id,
            name=spec.name,
            task=spec.task,
            task_artifacts_dir=spec.spec_file.parent,
            category=spec.category,
            difficulty=spec.info.difficulty,
            description=spec.info.description,
            dependencies=spec.dependencies,
            reference_answer=spec.ground.answer,
            source_uri=(
                f"__BUILTIN__/{spec.spec_file.relative_to(Path(__file__).parent)}"
            ),
        )

        challenge_class_name = f"Test{challenge_info.name}"
        logger.debug(f"Creating {challenge_class_name} from spec: {spec.spec_file}")
        return type(
            challenge_class_name,
            (BuiltinChallenge,),
            {
                "info": challenge_info,
                "_spec": spec,
                "CHALLENGE_LOCATION": str(spec.spec_file),
                "ARTIFACTS_LOCATION": str(spec.spec_file.resolve().parent),
            },
        )

    @classmethod
    def from_challenge_spec_file(cls, spec_file: Path) -> type["BuiltinChallenge"]:
        challenge_spec = BuiltinChallengeSpec.parse_file(spec_file)
        challenge_spec.spec_file = spec_file
        return cls.from_challenge_spec(challenge_spec)

    @classmethod
    def from_source_uri(cls, source_uri: str) -> type["BuiltinChallenge"]:
        if not source_uri.startswith(cls.SOURCE_URI_PREFIX):
            raise ValueError(f"Invalid source_uri for BuiltinChallenge: {source_uri}")

        path = source_uri.split("/", 1)[1]
        spec_file = Path(__file__).parent / path
        return cls.from_challenge_spec_file(spec_file)

    @pytest.mark.asyncio
    async def test_method(
        self,
        config: AgentBenchmarkConfig,
        request: pytest.FixtureRequest,
        i_attempt: int,
    ) -> None:
        if os.environ.get("HELICONE_API_KEY"):
            from helicone.lock import HeliconeLockManager

            HeliconeLockManager.write_custom_property("challenge", self.info.name)

        timeout = self._spec.cutoff or 60

        if request.config.getoption("--nc"):
            timeout = 100000
        elif cutoff := request.config.getoption("--cutoff"):
            timeout = int(cutoff)  # type: ignore

        task_id = ""
        timed_out = None
        try:
            async for step in self.run_challenge(config, timeout):
                if not task_id:
                    task_id = step.task_id
                if request.config.getoption("--mock"):
                    # Run only one step in mock mode
                    break
            timed_out = False
        except TimeoutError:
            timed_out = True
        request.node.user_properties.append(("timed_out", timed_out))

        agent_client_config = ClientConfig(host=config.host)
        async with ApiClient(agent_client_config) as api_client:
            api_instance = AgentApi(api_client)
            eval_results = await self.evaluate_task_state(api_instance, task_id)

        if not eval_results:
            if timed_out:
                raise TimeoutError("Timed out, no results to evaluate")
            else:
                raise ValueError("No results to evaluate")

        request.node.user_properties.append(
            (
                "answers",
                [r.result for r in eval_results]
                if request.config.getoption("--keep-answers")
                else None,
            )
        )
        request.node.user_properties.append(("scores", [r.score for r in eval_results]))

        # FIXME: this allows partial failure
        assert any(r.passed for r in eval_results), (
            f"No passed evals: {eval_results}"
            if not timed_out
            else f"Timed out; no passed evals: {eval_results}"
        )

    @classmethod
    async def evaluate_task_state(
        cls, agent: AgentApi, task_id: str
    ) -> list[EvalResult]:
        with tempfile.TemporaryDirectory() as workspace:
            workspace = Path(workspace)
            await download_agent_artifacts_into_folder(agent, task_id, workspace)
            if cls.info.task_artifacts_dir:
                copy_challenge_artifacts_into_workspace(
                    cls.info.task_artifacts_dir, "custom_python", workspace
                )

            return list(cls.evaluate_workspace_content(workspace))

    @classmethod
    def evaluate_workspace_content(cls, workspace: Path) -> Iterator[EvalResult]:
        if cls._spec.task == "" and os.getenv("IS_MOCK"):
            yield EvalResult(
                result="This is a mock answer",
                result_source="step_output",
                score=1.0,
                passed=True,
            )
            return

        result_ground = cls._spec.ground
        outputs_for_eval = cls.get_outputs_for_eval(workspace, result_ground)

        if result_ground.should_contain or result_ground.should_not_contain:
            for source, content in outputs_for_eval:
                score = cls.score_result(content, result_ground)
                if score is not None:
                    print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score)
                    yield EvalResult(
                        result=content,
                        result_source=str(source),
                        score=score,
                        passed=score > 0.9,  # FIXME: arbitrary threshold
                    )

        if result_ground.eval.type == "llm":
            combined_results = "\n".join(output[1] for output in outputs_for_eval)
            llm_eval = cls.score_result_with_llm(combined_results, result_ground)
            print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval)
            if result_ground.eval.scoring == "percentage":
                score = llm_eval / 100
            elif result_ground.eval.scoring == "scale":
                score = llm_eval / 10
            else:
                score = llm_eval

            yield EvalResult(
                result=combined_results,
                result_source=", ".join(str(res[0]) for res in outputs_for_eval),
                score=score,
                passed=score > 0.9,  # FIXME: arbitrary threshold
            )

    @staticmethod
    def get_outputs_for_eval(
        workspace: str | Path | dict[str, str], ground: BuiltinChallengeSpec.Ground
    ) -> Iterator[tuple[str | Path, str]]:
        if isinstance(workspace, dict):
            workspace = workspace["output"]

        script_dir = workspace

        for file_pattern in ground.files:
            # Check if it is a file extension
            if file_pattern.startswith("."):
                # Find all files with the given extension in the workspace
                matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern))
            else:
                # Otherwise, it is a specific file
                matching_files = [os.path.join(script_dir, file_pattern)]

            for file_path in matching_files:
                if ground.eval.type == "python":
                    result = subprocess.run(
                        [sys.executable, file_path],
                        cwd=os.path.abspath(workspace),
                        capture_output=True,
                        text=True,
                    )
                    if "error" in result.stderr or result.returncode != 0:
                        print(result.stderr)
                        assert False, result.stderr
                    yield (
                        Path(file_path).relative_to(workspace),
                        f"Output: {result.stdout}\n",
                    )
                else:
                    with open(file_path, "r") as f:
                        yield Path(file_path).relative_to(workspace), f.read()
        else:
            if ground.eval.type == "pytest":
                result = subprocess.run(
                    [sys.executable, "-m", "pytest"],
                    cwd=os.path.abspath(workspace),
                    capture_output=True,
                    text=True,
                )
                if "error" in result.stderr or result.returncode != 0:
                    print(result.stderr)
                    assert False, result.stderr
                yield "pytest", f"Output: {result.stdout}\n"

    @staticmethod
    def score_result(content: str, ground: BuiltinChallengeSpec.Ground) -> float | None:
        print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content)
        if ground.should_contain:
            for should_contain_word in ground.should_contain:
                if not ground.case_sensitive:
                    should_contain_word = should_contain_word.lower()
                    content = content.lower()
                print_content = (
                    f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}"
                    f" - {should_contain_word}:"
                )
                if should_contain_word not in content:
                    print(print_content, "False")
                    return 0.0
                else:
                    print(print_content, "True")
                    return 1.0

        if ground.should_not_contain:
            for should_not_contain_word in ground.should_not_contain:
                if not ground.case_sensitive:
                    should_not_contain_word = should_not_contain_word.lower()
                    content = content.lower()
                print_content = (
                    f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}"
                    f" - {should_not_contain_word}:"
                )
                if should_not_contain_word in content:
                    print(print_content, "False")
                    return 0.0
                else:
                    print(print_content, "True")
                    return 1.0

    @classmethod
    def score_result_with_llm(
        cls, content: str, ground: BuiltinChallengeSpec.Ground
    ) -> float:
        if os.getenv("IS_MOCK"):
            return 1.0

        # the validation for this is done in the Eval BaseModel
        scoring = SCORING_MAP[ground.eval.scoring]  # type: ignore
        prompt = PROMPT_MAP[ground.eval.template].format(  # type: ignore
            task=cls._spec.task, scoring=scoring, answer=ground.answer, response=content
        )

        if ground.eval.examples:
            prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples)

        prompt += END_PROMPT

        answer = get_openai_client().chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": prompt},
            ],
        )

        return float(answer.choices[0].message.content)  # type: ignore


def load_builtin_challenges() -> Iterator[type[BuiltinChallenge]]:
    logger.info("Loading built-in challenges...")

    challenges_path = os.path.dirname(__file__)
    logger.debug(f"Looking for challenge spec files in {challenges_path}...")

    json_files = deque(
        glob.glob(
            f"{challenges_path}/**/data.json",
            recursive=True,
        )
    )

    logger.debug(f"Found {len(json_files)} built-in challenges.")

    loaded, ignored = 0, 0
    while json_files:
        # Take and remove the first element from json_files
        json_file = json_files.popleft()
        if _challenge_should_be_ignored(json_file):
            ignored += 1
            continue

        challenge = BuiltinChallenge.from_challenge_spec_file(Path(json_file))
        logger.debug(f"Generated test for {challenge.info.name}")
        yield challenge

        loaded += 1

    logger.info(
        f"Loading built-in challenges complete: loaded {loaded}, ignored {ignored}."
    )


def _challenge_should_be_ignored(json_file_path: str):
    return (
        "challenges/deprecated" in json_file_path
        or "challenges/library" in json_file_path
    )