aboutsummaryrefslogtreecommitdiff
path: root/benchmark/agbenchmark/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'benchmark/agbenchmark/app.py')
-rw-r--r--benchmark/agbenchmark/app.py575
1 files changed, 248 insertions, 327 deletions
diff --git a/benchmark/agbenchmark/app.py b/benchmark/agbenchmark/app.py
index ad14cb692..40fee14b6 100644
--- a/benchmark/agbenchmark/app.py
+++ b/benchmark/agbenchmark/app.py
@@ -1,78 +1,81 @@
import datetime
+import glob
+import json
+import logging
+import sys
+import time
import uuid
-from collections import defaultdict, deque
+from collections import deque
+from multiprocessing import Process
from pathlib import Path
+from typing import Optional
import httpx
-
-from agbenchmark.agent_protocol_client import (
- AgentApi,
- ApiClient,
- ApiException,
- Configuration,
+import psutil
+from agent_protocol_client import AgentApi, ApiClient, ApiException, Configuration
+from agent_protocol_client.models import Task, TaskRequestBody
+from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
+from fastapi.middleware.cors import CORSMiddleware
+from pydantic import BaseModel, Extra, ValidationError
+
+from agbenchmark.challenges import ChallengeInfo
+from agbenchmark.config import AgentBenchmarkConfig
+from agbenchmark.reports.processing.report_types_v2 import (
+ BenchmarkRun,
+ Metrics,
+ RepositoryInfo,
+ RunDetails,
+ TaskInfo,
)
-from agbenchmark.reports.processing.report_types_v2 import BenchmarkRun
from agbenchmark.schema import TaskEvalRequestBody
from agbenchmark.utils.utils import write_pretty_json
-configuration = Configuration(host="http://localhost:8000" + "/ap/v1")
-
-import json
-import os
-import sys
-from typing import Any, Optional
-
-import psutil
-from fastapi import APIRouter, FastAPI
-from fastapi import (
- HTTPException as FastAPIHTTPException, # Import HTTPException from FastAPI
-)
-from fastapi import Request, Response
-from fastapi.middleware.cors import CORSMiddleware
-
-from agbenchmark.execute_sub_process import execute_subprocess
-from agbenchmark.schema import Task, TaskRequestBody
-
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from fastapi import FastAPI
-from pydantic import BaseModel, Extra
-
-router = APIRouter()
-import glob
-
-# Change the current working directory to the benchmark path
-# home_path = find_absolute_benchmark_path()
-# os.chdir(home_path)
+sys.path.append(str(Path(__file__).parent.parent))
-general_command = ["poetry", "run", "agbenchmark", "start", "--backend"]
+logger = logging.getLogger(__name__)
-import psutil
-
-challenges_path = os.path.join(os.path.dirname(__file__), "challenges")
-
-json_files = deque(
+CHALLENGES: dict[str, ChallengeInfo] = {}
+challenges_path = Path(__file__).parent / "challenges"
+challenge_spec_files = deque(
glob.glob(
f"{challenges_path}/**/data.json",
recursive=True,
)
)
-CHALLENGES = {}
-task_informations = defaultdict(dict)
+logger.debug("Loading challenges...")
+while challenge_spec_files:
+ challenge_spec_file = Path(challenge_spec_files.popleft())
+ challenge_relpath = challenge_spec_file.relative_to(challenges_path.parent)
+ if challenge_relpath.is_relative_to("challenges/deprecated"):
+ continue
+
+ logger.debug(f"Loading {challenge_relpath}...")
+ try:
+ challenge_info = ChallengeInfo.parse_file(challenge_spec_file)
+ except ValidationError as e:
+ if logging.getLogger().level == logging.DEBUG:
+ logger.warning(f"Spec file {challenge_relpath} failed to load:\n{e}")
+ logger.debug(f"Invalid challenge spec: {challenge_spec_file.read_text()}")
+ continue
+ challenge_info.spec_file = challenge_spec_file
-while json_files:
- json_file = json_files.popleft()
+ if not challenge_info.eval_id:
+ challenge_info.eval_id = str(uuid.uuid4())
+ # this will sort all the keys of the JSON systematically
+ # so that the order is always the same
+ write_pretty_json(challenge_info.dict(), challenge_spec_file)
- with open(json_file, "r") as file:
- data = json.load(file)
+ CHALLENGES[challenge_info.eval_id] = challenge_info
- if "eval_id" not in data:
- data["eval_id"] = str(uuid.uuid4())
- # this will sort all the keys of the JSON systematically so that the order is always the same
- write_pretty_json(data, json_file)
- # ok
- CHALLENGES[data["eval_id"]] = data
- CHALLENGES[data["eval_id"]]["path"] = json_file
+
+class BenchmarkTaskInfo(BaseModel):
+ task_id: str
+ start_time: datetime.datetime
+ challenge_info: ChallengeInfo
+
+
+task_informations: dict[str, BenchmarkTaskInfo] = {}
def find_agbenchmark_without_uvicorn():
@@ -93,10 +96,10 @@ def find_agbenchmark_without_uvicorn():
):
try:
# Convert the process.info dictionary values to strings and concatenate them
- full_info = " ".join([str(v) for k, v in process.info.items()])
+ full_info = " ".join([str(v) for k, v in process.as_dict().items()])
if "agbenchmark" in full_info and "uvicorn" not in full_info:
- pids.append(process.info["pid"])
+ pids.append(process.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
@@ -114,24 +117,12 @@ class CreateReportRequest(BaseModel):
updates_list = []
-updates_list = []
-
-import json
-
origins = [
"http://localhost:8000",
"http://localhost:8080",
"http://127.0.0.1:5000",
"http://localhost:5000",
]
-app = FastAPI()
-app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
-)
def stream_output(pipe):
@@ -139,275 +130,205 @@ def stream_output(pipe):
print(line, end="")
-@router.post("/reports")
-def run_single_test(body: CreateReportRequest) -> Any:
- pids = find_agbenchmark_without_uvicorn()
- print(f"pids already running with agbenchmark: {pids}")
- print(body.dict())
- # it's a hack because other parts of the code are using sys.argv
- print(os.getcwd())
- command_options = ["agbenchmark"]
- # if body.category:
- # sys.argv.append(f"--category={body.category}")
- command_options.append(f"--test={body.test}")
- if body.mock:
- command_options.append("--mock")
-
- execute_subprocess(command_options, 200)
- import json
- from pathlib import Path
-
- print("finished running")
- # List all folders in the current working directory
- path_reports = Path.cwd() / "agbenchmark_config" / "reports"
- folders = [folder for folder in path_reports.iterdir() if folder.is_dir()]
-
- # Sort the folders based on their names
- sorted_folders = sorted(folders, key=lambda x: x.name)
-
- # Get the last folder
- last_folder = sorted_folders[-1] if sorted_folders else None
-
- # Read report.json from this folder
- if last_folder:
- report_path = last_folder / "report.json"
- print(report_path)
- if report_path.exists():
- with report_path.open() as file:
- data = json.load(file)
- print(data)
- else:
- print(f"'report.json' does not exist in '{last_folder}'")
- else:
- print("No folders found.")
-
- return Response(
- content=json.dumps(data),
- status_code=200,
- media_type="application/json",
- )
-
-
-import json
-from typing import Any
+def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI:
+ from agbenchmark.agent_api_interface import upload_artifacts
+ from agbenchmark.challenges import get_challenge_from_source_uri
+ from agbenchmark.main import run_benchmark
-from fastapi import FastAPI, Request, Response
+ configuration = Configuration(
+ host=agbenchmark_config.host or "http://localhost:8000"
+ )
+ app = FastAPI()
+ app.add_middleware(
+ CORSMiddleware,
+ allow_origins=origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+ router = APIRouter()
+ @router.post("/reports")
+ def run_single_test(body: CreateReportRequest) -> dict:
+ pids = find_agbenchmark_without_uvicorn()
+ logger.info(f"pids already running with agbenchmark: {pids}")
-@router.get("/updates")
-def get_updates(request: Request) -> Any:
- from agbenchmark.__main__ import UPDATES_JSON_PATH
+ logger.debug(f"Request to /reports: {body.dict()}")
- try:
- # Read data from the "update.json" file (provide the correct file path)
- with open(UPDATES_JSON_PATH, "r") as file:
- data = json.load(file)
-
- # Get the last_update_time from the query parameter
- query_param = request.query_params.get("last_update_time")
-
- if query_param is None:
- # Handle the case when last_update_time is not provided
- print("ERROR: last_update_time parameter is missing")
- return Response(
- content=json.dumps({"error": "last_update_time parameter is missing"}),
- status_code=400,
- media_type="application/json",
- headers={"Content-Type": "application/json"},
+ # Start the benchmark in a separate thread
+ benchmark_process = Process(
+ target=lambda: run_benchmark(
+ config=agbenchmark_config,
+ tests=(body.test,),
+ mock=body.mock or False,
)
-
- # Convert query_param to a Unix timestamp (assuming it's in seconds as a string)
- query_timestamp = int(query_param)
-
- # Filter the data based on the timestamp (keep timestamps before query_timestamp)
- filtered_data = [item for item in data if item["timestamp"] > query_timestamp]
-
- # Extract only the "content" field from each item
- filtered_data = [item["content"] for item in filtered_data]
-
- # Convert the filtered data to JSON
- filtered_json = json.dumps(filtered_data, indent=2)
-
- print("INFO: Returning filtered data to the client")
- return Response(
- content=filtered_json,
- status_code=200,
- media_type="application/json",
- headers={"Content-Type": "application/json"},
- )
- except FileNotFoundError:
- print("ERROR: File not found: updates.json")
- return Response(
- content=json.dumps({"error": "File not found"}),
- status_code=404,
- media_type="application/json",
- headers={"Content-Type": "application/json"},
)
-
-
-@router.post("/agent/tasks", tags=["agent"], response_model=Task)
-async def create_agent_task(task_eval_request: TaskEvalRequestBody) -> Task:
- """
- Creates a new task using the provided TaskRequestBody and returns a Task.
-
- Args:
- request (Request): FastAPI request object.
- task (TaskRequestBody): The task request containing input and additional input data.
-
- Returns:
- Task: A new task with task_id, input, additional_input, and empty lists for artifacts and steps.
-
- Example:
- Request (TaskRequestBody defined in schema.py):
- {
- "input": "Write the words you receive to the file 'output.txt'.",
- "additional_input": "python/code"
- }
-
- Response (Task defined in schema.py):
- {
- "task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
- "input": "Write the word 'Washington' to a .txt file",
- "additional_input": "python/code",
- "artifacts": [],
- }
- """
- from agbenchmark.agent_api_interface import upload_artifacts
-
- try:
- async with ApiClient(configuration) as api_client:
- api_instance = AgentApi(api_client)
- task_input = CHALLENGES[task_eval_request.eval_id]["task"]
-
- task_request_body = TaskRequestBody(input=task_input)
- task_response = await api_instance.create_agent_task(
- task_request_body=task_request_body
- )
- task_informations[task_response.task_id][
- "benchmark_start_time"
- ] = datetime.datetime.now(datetime.timezone.utc).strftime(
- "%Y-%m-%dT%H:%M:%S+00:00"
+ benchmark_process.start()
+
+ # Wait for the benchmark to finish, with a timeout of 200 seconds
+ timeout = 200
+ start_time = time.time()
+ while benchmark_process.is_alive():
+ if time.time() - start_time > timeout:
+ logger.warning(f"Benchmark run timed out after {timeout} seconds")
+ benchmark_process.terminate()
+ break
+ time.sleep(1)
+ else:
+ logger.debug(f"Benchmark finished running in {time.time() - start_time} s")
+
+ # List all folders in the current working directory
+ path_reports = agbenchmark_config.reports_folder
+ folders = [folder for folder in path_reports.iterdir() if folder.is_dir()]
+
+ # Sort the folders based on their names
+ sorted_folders = sorted(folders, key=lambda x: x.name)
+
+ # Get the last folder
+ latest_folder = sorted_folders[-1] if sorted_folders else None
+
+ # Read report.json from this folder
+ if latest_folder:
+ report_path = latest_folder / "report.json"
+ logger.debug(f"Getting latest report from {report_path}")
+ if report_path.exists():
+ with report_path.open() as file:
+ data = json.load(file)
+ logger.debug(f"Report data: {data}")
+ else:
+ logger.error(
+ "Could not get result after running benchmark: "
+ f"'report.json' does not exist in '{latest_folder}'"
+ )
+ else:
+ logger.error(
+ "Could not get result after running benchmark: no reports found"
)
- task_informations[task_response.task_id][
- "eval_id"
- ] = task_eval_request.eval_id
- await upload_artifacts(
- api_instance,
- str(Path(CHALLENGES[task_eval_request.eval_id]["path"]).parent),
- task_response.task_id,
- "artifacts_in",
+
+ return data
+
+ @router.post("/agent/tasks", tags=["agent"])
+ async def create_agent_task(task_eval_request: TaskEvalRequestBody) -> Task:
+ """
+ Creates a new task using the provided TaskEvalRequestBody and returns a Task.
+
+ Args:
+ task_eval_request: `TaskRequestBody` including an eval_id.
+
+ Returns:
+ Task: A new task with task_id, input, additional_input,
+ and empty lists for artifacts and steps.
+
+ Example:
+ Request (TaskEvalRequestBody defined in schema.py):
+ {
+ ...,
+ "eval_id": "50da533e-3904-4401-8a07-c49adf88b5eb"
+ }
+
+ Response (Task defined in `agent_protocol_client.models`):
+ {
+ "task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
+ "input": "Write the word 'Washington' to a .txt file",
+ "artifacts": []
+ }
+ """
+ try:
+ challenge_info = CHALLENGES[task_eval_request.eval_id]
+ async with ApiClient(configuration) as api_client:
+ api_instance = AgentApi(api_client)
+ task_input = challenge_info.task
+
+ task_request_body = TaskRequestBody(input=task_input)
+ task_response = await api_instance.create_agent_task(
+ task_request_body=task_request_body
+ )
+ task_info = BenchmarkTaskInfo(
+ task_id=task_response.task_id,
+ start_time=datetime.datetime.now(datetime.timezone.utc),
+ challenge_info=challenge_info,
+ )
+ task_informations[task_info.task_id] = task_info
+
+ if input_artifacts_dir := challenge_info.task_artifacts_dir:
+ await upload_artifacts(
+ api_instance,
+ input_artifacts_dir,
+ task_response.task_id,
+ "artifacts_in",
+ )
+ return task_response
+ except ApiException as e:
+ logger.error(f"Error whilst trying to create a task:\n{e}")
+ logger.error(
+ "The above error was caused while processing request: "
+ f"{task_eval_request}"
)
- return Response(
- content=task_response.json(),
- status_code=200,
- media_type="application/json",
+ raise HTTPException(500)
+
+ @router.post("/agent/tasks/{task_id}/steps")
+ async def proxy(request: Request, task_id: str):
+ timeout = httpx.Timeout(300.0, read=300.0) # 5 minutes
+ async with httpx.AsyncClient(timeout=timeout) as client:
+ # Construct the new URL
+ new_url = f"{configuration.host}/ap/v1/agent/tasks/{task_id}/steps"
+
+ # Forward the request
+ response = await client.post(
+ new_url,
+ data=await request.body(),
+ headers=dict(request.headers),
)
- except ApiException as e:
- print(f"Error whilst trying to create a task: {task_eval_request}")
- return Response(
- content=json.dumps({"error": "Internal server error"}),
- status_code=500,
- media_type="application/json",
- )
-
-@router.post("/agent/tasks/{task_id}/steps")
-async def proxy(request: Request, task_id: str):
- timeout = httpx.Timeout(300.0, read=300.0) # 5 minutes
- async with httpx.AsyncClient(timeout=timeout) as client:
- # Construct the new URL
- new_url = f"http://localhost:8000/ap/v1/agent/tasks/{task_id}/steps"
-
- # Forward the request
- response = await client.post(
- new_url,
- data=await request.body(),
- headers=dict(request.headers),
- )
+ # Return the response from the forwarded request
+ return Response(content=response.content, status_code=response.status_code)
- # Return the response from the forwarded request
- return Response(content=response.content, status_code=response.status_code)
-
-
-@router.post("/agent/tasks/{task_id}/evaluations")
-async def create_evaluation(task_id: str) -> deque:
- from agbenchmark.__main__ import TEMP_FOLDER_ABS_PATH
- from agbenchmark.agent_api_interface import copy_agent_artifacts_into_temp_folder
- from agbenchmark.agent_interface import copy_artifacts_into_temp_folder
- from agbenchmark.generate_test import create_challenge
+ @router.post("/agent/tasks/{task_id}/evaluations")
+ async def create_evaluation(task_id: str) -> BenchmarkRun:
+ task_info = task_informations[task_id]
+ challenge = get_challenge_from_source_uri(task_info.challenge_info.source_uri)
+ try:
+ async with ApiClient(configuration) as api_client:
+ api_instance = AgentApi(api_client)
+ eval_results = await challenge.evaluate_task_state(
+ api_instance, task_id
+ )
+
+ eval_info = BenchmarkRun(
+ repository_info=RepositoryInfo(),
+ run_details=RunDetails(
+ command=f"agbenchmark --test={challenge.info.name}",
+ benchmark_start_time=(
+ task_info.start_time.strftime("%Y-%m-%dT%H:%M:%S+00:00")
+ ),
+ test_name=challenge.info.name,
+ ),
+ task_info=TaskInfo(
+ data_path=challenge.info.source_uri,
+ is_regression=None,
+ category=[c.value for c in challenge.info.category],
+ task=challenge.info.task,
+ answer=challenge.info.reference_answer or "",
+ description=challenge.info.description or "",
+ ),
+ metrics=Metrics(
+ success=all(e.passed for e in eval_results),
+ success_percentage=(
+ 100 * sum(e.score for e in eval_results) / len(eval_results)
+ if eval_results # avoid division by 0
+ else 0
+ ),
+ attempted=True,
+ ),
+ config={},
+ )
- try:
- async with ApiClient(configuration) as api_client:
- api_instance = AgentApi(api_client)
- await copy_agent_artifacts_into_temp_folder(api_instance, task_id)
- # add custom python
- data = CHALLENGES[task_informations[task_id]["eval_id"]]
-
- artifact_path = str(Path(data["path"]).parent)
- copy_artifacts_into_temp_folder(
- TEMP_FOLDER_ABS_PATH, "custom_python", artifact_path
- )
- json_file = CHALLENGES[task_informations[task_id]["eval_id"]]["path"]
- json_files = deque()
-
- _, challenge_class = create_challenge(data, json_file, json_files)
- challenge_instance = challenge_class()
- scores = challenge_instance.get_scores(config={})
- test_name = "Test" + data["name"]
- is_score_100 = 1 in scores["values"]
-
- info_details = {
- "repository_info": {
- "repo_url": None,
- "team_name": None,
- "benchmark_git_commit_sha": None,
- "agent_git_commit_sha": None,
- },
- "run_details": {
- "run_id": None,
- "command": "agbenchmark" + " --test=" + test_name,
- "completion_time": None,
- "benchmark_start_time": task_informations[task_id][
- "benchmark_start_time"
- ],
- "test_name": data["name"],
- },
- "task_info": {
- "data_path": data["path"].split("benchmark/", 1)[-1],
- "is_regression": None,
- "category": data["category"],
- "task": data["task"],
- "answer": data["ground"]["answer"],
- "description": data["info"]["description"],
- },
- "metrics": {
- "difficulty": None,
- "success": is_score_100,
- "attempted": True,
- "success_percentage": None,
- "cost": None,
- "run_time": None,
- },
- "reached_cutoff": None,
- "config": {},
- }
-
- BenchmarkRun.parse_obj(info_details)
-
- print(json.dumps(info_details, indent=4))
- return Response(
- content=json.dumps(info_details),
- status_code=200,
- media_type="application/json",
- )
- except ApiException as e:
- print(f"Error whilst trying to evaluate the task: {task_id}")
- return Response(
- content=json.dumps({"error": "Internal server error"}),
- status_code=500,
- media_type="application/json",
- )
- # path = Path(json_file).resolve()
+ logger.debug(f"Returning evaluation data:\n{eval_info.json(indent=4)}")
+ return eval_info
+ except ApiException as e:
+ logger.error(f"Error {e} whilst trying to evaluate task: {task_id}")
+ raise HTTPException(500)
+ app.include_router(router, prefix="/ap/v1")
-app.include_router(router, prefix="/ap/v1")
+ return app