diff options
Diffstat (limited to 'benchmark/agbenchmark/app.py')
-rw-r--r-- | benchmark/agbenchmark/app.py | 575 |
1 files changed, 248 insertions, 327 deletions
diff --git a/benchmark/agbenchmark/app.py b/benchmark/agbenchmark/app.py index ad14cb692..40fee14b6 100644 --- a/benchmark/agbenchmark/app.py +++ b/benchmark/agbenchmark/app.py @@ -1,78 +1,81 @@ import datetime +import glob +import json +import logging +import sys +import time import uuid -from collections import defaultdict, deque +from collections import deque +from multiprocessing import Process from pathlib import Path +from typing import Optional import httpx - -from agbenchmark.agent_protocol_client import ( - AgentApi, - ApiClient, - ApiException, - Configuration, +import psutil +from agent_protocol_client import AgentApi, ApiClient, ApiException, Configuration +from agent_protocol_client.models import Task, TaskRequestBody +from fastapi import APIRouter, FastAPI, HTTPException, Request, Response +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Extra, ValidationError + +from agbenchmark.challenges import ChallengeInfo +from agbenchmark.config import AgentBenchmarkConfig +from agbenchmark.reports.processing.report_types_v2 import ( + BenchmarkRun, + Metrics, + RepositoryInfo, + RunDetails, + TaskInfo, ) -from agbenchmark.reports.processing.report_types_v2 import BenchmarkRun from agbenchmark.schema import TaskEvalRequestBody from agbenchmark.utils.utils import write_pretty_json -configuration = Configuration(host="http://localhost:8000" + "/ap/v1") - -import json -import os -import sys -from typing import Any, Optional - -import psutil -from fastapi import APIRouter, FastAPI -from fastapi import ( - HTTPException as FastAPIHTTPException, # Import HTTPException from FastAPI -) -from fastapi import Request, Response -from fastapi.middleware.cors import CORSMiddleware - -from agbenchmark.execute_sub_process import execute_subprocess -from agbenchmark.schema import Task, TaskRequestBody - -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from fastapi import FastAPI -from pydantic import BaseModel, Extra - -router = APIRouter() -import glob - -# Change the current working directory to the benchmark path -# home_path = find_absolute_benchmark_path() -# os.chdir(home_path) +sys.path.append(str(Path(__file__).parent.parent)) -general_command = ["poetry", "run", "agbenchmark", "start", "--backend"] +logger = logging.getLogger(__name__) -import psutil - -challenges_path = os.path.join(os.path.dirname(__file__), "challenges") - -json_files = deque( +CHALLENGES: dict[str, ChallengeInfo] = {} +challenges_path = Path(__file__).parent / "challenges" +challenge_spec_files = deque( glob.glob( f"{challenges_path}/**/data.json", recursive=True, ) ) -CHALLENGES = {} -task_informations = defaultdict(dict) +logger.debug("Loading challenges...") +while challenge_spec_files: + challenge_spec_file = Path(challenge_spec_files.popleft()) + challenge_relpath = challenge_spec_file.relative_to(challenges_path.parent) + if challenge_relpath.is_relative_to("challenges/deprecated"): + continue + + logger.debug(f"Loading {challenge_relpath}...") + try: + challenge_info = ChallengeInfo.parse_file(challenge_spec_file) + except ValidationError as e: + if logging.getLogger().level == logging.DEBUG: + logger.warning(f"Spec file {challenge_relpath} failed to load:\n{e}") + logger.debug(f"Invalid challenge spec: {challenge_spec_file.read_text()}") + continue + challenge_info.spec_file = challenge_spec_file -while json_files: - json_file = json_files.popleft() + if not challenge_info.eval_id: + challenge_info.eval_id = str(uuid.uuid4()) + # this will sort all the keys of the JSON systematically + # so that the order is always the same + write_pretty_json(challenge_info.dict(), challenge_spec_file) - with open(json_file, "r") as file: - data = json.load(file) + CHALLENGES[challenge_info.eval_id] = challenge_info - if "eval_id" not in data: - data["eval_id"] = str(uuid.uuid4()) - # this will sort all the keys of the JSON systematically so that the order is always the same - write_pretty_json(data, json_file) - # ok - CHALLENGES[data["eval_id"]] = data - CHALLENGES[data["eval_id"]]["path"] = json_file + +class BenchmarkTaskInfo(BaseModel): + task_id: str + start_time: datetime.datetime + challenge_info: ChallengeInfo + + +task_informations: dict[str, BenchmarkTaskInfo] = {} def find_agbenchmark_without_uvicorn(): @@ -93,10 +96,10 @@ def find_agbenchmark_without_uvicorn(): ): try: # Convert the process.info dictionary values to strings and concatenate them - full_info = " ".join([str(v) for k, v in process.info.items()]) + full_info = " ".join([str(v) for k, v in process.as_dict().items()]) if "agbenchmark" in full_info and "uvicorn" not in full_info: - pids.append(process.info["pid"]) + pids.append(process.pid) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return pids @@ -114,24 +117,12 @@ class CreateReportRequest(BaseModel): updates_list = [] -updates_list = [] - -import json - origins = [ "http://localhost:8000", "http://localhost:8080", "http://127.0.0.1:5000", "http://localhost:5000", ] -app = FastAPI() -app.add_middleware( - CORSMiddleware, - allow_origins=origins, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) def stream_output(pipe): @@ -139,275 +130,205 @@ def stream_output(pipe): print(line, end="") -@router.post("/reports") -def run_single_test(body: CreateReportRequest) -> Any: - pids = find_agbenchmark_without_uvicorn() - print(f"pids already running with agbenchmark: {pids}") - print(body.dict()) - # it's a hack because other parts of the code are using sys.argv - print(os.getcwd()) - command_options = ["agbenchmark"] - # if body.category: - # sys.argv.append(f"--category={body.category}") - command_options.append(f"--test={body.test}") - if body.mock: - command_options.append("--mock") - - execute_subprocess(command_options, 200) - import json - from pathlib import Path - - print("finished running") - # List all folders in the current working directory - path_reports = Path.cwd() / "agbenchmark_config" / "reports" - folders = [folder for folder in path_reports.iterdir() if folder.is_dir()] - - # Sort the folders based on their names - sorted_folders = sorted(folders, key=lambda x: x.name) - - # Get the last folder - last_folder = sorted_folders[-1] if sorted_folders else None - - # Read report.json from this folder - if last_folder: - report_path = last_folder / "report.json" - print(report_path) - if report_path.exists(): - with report_path.open() as file: - data = json.load(file) - print(data) - else: - print(f"'report.json' does not exist in '{last_folder}'") - else: - print("No folders found.") - - return Response( - content=json.dumps(data), - status_code=200, - media_type="application/json", - ) - - -import json -from typing import Any +def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI: + from agbenchmark.agent_api_interface import upload_artifacts + from agbenchmark.challenges import get_challenge_from_source_uri + from agbenchmark.main import run_benchmark -from fastapi import FastAPI, Request, Response + configuration = Configuration( + host=agbenchmark_config.host or "http://localhost:8000" + ) + app = FastAPI() + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + router = APIRouter() + @router.post("/reports") + def run_single_test(body: CreateReportRequest) -> dict: + pids = find_agbenchmark_without_uvicorn() + logger.info(f"pids already running with agbenchmark: {pids}") -@router.get("/updates") -def get_updates(request: Request) -> Any: - from agbenchmark.__main__ import UPDATES_JSON_PATH + logger.debug(f"Request to /reports: {body.dict()}") - try: - # Read data from the "update.json" file (provide the correct file path) - with open(UPDATES_JSON_PATH, "r") as file: - data = json.load(file) - - # Get the last_update_time from the query parameter - query_param = request.query_params.get("last_update_time") - - if query_param is None: - # Handle the case when last_update_time is not provided - print("ERROR: last_update_time parameter is missing") - return Response( - content=json.dumps({"error": "last_update_time parameter is missing"}), - status_code=400, - media_type="application/json", - headers={"Content-Type": "application/json"}, + # Start the benchmark in a separate thread + benchmark_process = Process( + target=lambda: run_benchmark( + config=agbenchmark_config, + tests=(body.test,), + mock=body.mock or False, ) - - # Convert query_param to a Unix timestamp (assuming it's in seconds as a string) - query_timestamp = int(query_param) - - # Filter the data based on the timestamp (keep timestamps before query_timestamp) - filtered_data = [item for item in data if item["timestamp"] > query_timestamp] - - # Extract only the "content" field from each item - filtered_data = [item["content"] for item in filtered_data] - - # Convert the filtered data to JSON - filtered_json = json.dumps(filtered_data, indent=2) - - print("INFO: Returning filtered data to the client") - return Response( - content=filtered_json, - status_code=200, - media_type="application/json", - headers={"Content-Type": "application/json"}, - ) - except FileNotFoundError: - print("ERROR: File not found: updates.json") - return Response( - content=json.dumps({"error": "File not found"}), - status_code=404, - media_type="application/json", - headers={"Content-Type": "application/json"}, ) - - -@router.post("/agent/tasks", tags=["agent"], response_model=Task) -async def create_agent_task(task_eval_request: TaskEvalRequestBody) -> Task: - """ - Creates a new task using the provided TaskRequestBody and returns a Task. - - Args: - request (Request): FastAPI request object. - task (TaskRequestBody): The task request containing input and additional input data. - - Returns: - Task: A new task with task_id, input, additional_input, and empty lists for artifacts and steps. - - Example: - Request (TaskRequestBody defined in schema.py): - { - "input": "Write the words you receive to the file 'output.txt'.", - "additional_input": "python/code" - } - - Response (Task defined in schema.py): - { - "task_id": "50da533e-3904-4401-8a07-c49adf88b5eb", - "input": "Write the word 'Washington' to a .txt file", - "additional_input": "python/code", - "artifacts": [], - } - """ - from agbenchmark.agent_api_interface import upload_artifacts - - try: - async with ApiClient(configuration) as api_client: - api_instance = AgentApi(api_client) - task_input = CHALLENGES[task_eval_request.eval_id]["task"] - - task_request_body = TaskRequestBody(input=task_input) - task_response = await api_instance.create_agent_task( - task_request_body=task_request_body - ) - task_informations[task_response.task_id][ - "benchmark_start_time" - ] = datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y-%m-%dT%H:%M:%S+00:00" + benchmark_process.start() + + # Wait for the benchmark to finish, with a timeout of 200 seconds + timeout = 200 + start_time = time.time() + while benchmark_process.is_alive(): + if time.time() - start_time > timeout: + logger.warning(f"Benchmark run timed out after {timeout} seconds") + benchmark_process.terminate() + break + time.sleep(1) + else: + logger.debug(f"Benchmark finished running in {time.time() - start_time} s") + + # List all folders in the current working directory + path_reports = agbenchmark_config.reports_folder + folders = [folder for folder in path_reports.iterdir() if folder.is_dir()] + + # Sort the folders based on their names + sorted_folders = sorted(folders, key=lambda x: x.name) + + # Get the last folder + latest_folder = sorted_folders[-1] if sorted_folders else None + + # Read report.json from this folder + if latest_folder: + report_path = latest_folder / "report.json" + logger.debug(f"Getting latest report from {report_path}") + if report_path.exists(): + with report_path.open() as file: + data = json.load(file) + logger.debug(f"Report data: {data}") + else: + logger.error( + "Could not get result after running benchmark: " + f"'report.json' does not exist in '{latest_folder}'" + ) + else: + logger.error( + "Could not get result after running benchmark: no reports found" ) - task_informations[task_response.task_id][ - "eval_id" - ] = task_eval_request.eval_id - await upload_artifacts( - api_instance, - str(Path(CHALLENGES[task_eval_request.eval_id]["path"]).parent), - task_response.task_id, - "artifacts_in", + + return data + + @router.post("/agent/tasks", tags=["agent"]) + async def create_agent_task(task_eval_request: TaskEvalRequestBody) -> Task: + """ + Creates a new task using the provided TaskEvalRequestBody and returns a Task. + + Args: + task_eval_request: `TaskRequestBody` including an eval_id. + + Returns: + Task: A new task with task_id, input, additional_input, + and empty lists for artifacts and steps. + + Example: + Request (TaskEvalRequestBody defined in schema.py): + { + ..., + "eval_id": "50da533e-3904-4401-8a07-c49adf88b5eb" + } + + Response (Task defined in `agent_protocol_client.models`): + { + "task_id": "50da533e-3904-4401-8a07-c49adf88b5eb", + "input": "Write the word 'Washington' to a .txt file", + "artifacts": [] + } + """ + try: + challenge_info = CHALLENGES[task_eval_request.eval_id] + async with ApiClient(configuration) as api_client: + api_instance = AgentApi(api_client) + task_input = challenge_info.task + + task_request_body = TaskRequestBody(input=task_input) + task_response = await api_instance.create_agent_task( + task_request_body=task_request_body + ) + task_info = BenchmarkTaskInfo( + task_id=task_response.task_id, + start_time=datetime.datetime.now(datetime.timezone.utc), + challenge_info=challenge_info, + ) + task_informations[task_info.task_id] = task_info + + if input_artifacts_dir := challenge_info.task_artifacts_dir: + await upload_artifacts( + api_instance, + input_artifacts_dir, + task_response.task_id, + "artifacts_in", + ) + return task_response + except ApiException as e: + logger.error(f"Error whilst trying to create a task:\n{e}") + logger.error( + "The above error was caused while processing request: " + f"{task_eval_request}" ) - return Response( - content=task_response.json(), - status_code=200, - media_type="application/json", + raise HTTPException(500) + + @router.post("/agent/tasks/{task_id}/steps") + async def proxy(request: Request, task_id: str): + timeout = httpx.Timeout(300.0, read=300.0) # 5 minutes + async with httpx.AsyncClient(timeout=timeout) as client: + # Construct the new URL + new_url = f"{configuration.host}/ap/v1/agent/tasks/{task_id}/steps" + + # Forward the request + response = await client.post( + new_url, + data=await request.body(), + headers=dict(request.headers), ) - except ApiException as e: - print(f"Error whilst trying to create a task: {task_eval_request}") - return Response( - content=json.dumps({"error": "Internal server error"}), - status_code=500, - media_type="application/json", - ) - -@router.post("/agent/tasks/{task_id}/steps") -async def proxy(request: Request, task_id: str): - timeout = httpx.Timeout(300.0, read=300.0) # 5 minutes - async with httpx.AsyncClient(timeout=timeout) as client: - # Construct the new URL - new_url = f"http://localhost:8000/ap/v1/agent/tasks/{task_id}/steps" - - # Forward the request - response = await client.post( - new_url, - data=await request.body(), - headers=dict(request.headers), - ) + # Return the response from the forwarded request + return Response(content=response.content, status_code=response.status_code) - # Return the response from the forwarded request - return Response(content=response.content, status_code=response.status_code) - - -@router.post("/agent/tasks/{task_id}/evaluations") -async def create_evaluation(task_id: str) -> deque: - from agbenchmark.__main__ import TEMP_FOLDER_ABS_PATH - from agbenchmark.agent_api_interface import copy_agent_artifacts_into_temp_folder - from agbenchmark.agent_interface import copy_artifacts_into_temp_folder - from agbenchmark.generate_test import create_challenge + @router.post("/agent/tasks/{task_id}/evaluations") + async def create_evaluation(task_id: str) -> BenchmarkRun: + task_info = task_informations[task_id] + challenge = get_challenge_from_source_uri(task_info.challenge_info.source_uri) + try: + async with ApiClient(configuration) as api_client: + api_instance = AgentApi(api_client) + eval_results = await challenge.evaluate_task_state( + api_instance, task_id + ) + + eval_info = BenchmarkRun( + repository_info=RepositoryInfo(), + run_details=RunDetails( + command=f"agbenchmark --test={challenge.info.name}", + benchmark_start_time=( + task_info.start_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") + ), + test_name=challenge.info.name, + ), + task_info=TaskInfo( + data_path=challenge.info.source_uri, + is_regression=None, + category=[c.value for c in challenge.info.category], + task=challenge.info.task, + answer=challenge.info.reference_answer or "", + description=challenge.info.description or "", + ), + metrics=Metrics( + success=all(e.passed for e in eval_results), + success_percentage=( + 100 * sum(e.score for e in eval_results) / len(eval_results) + if eval_results # avoid division by 0 + else 0 + ), + attempted=True, + ), + config={}, + ) - try: - async with ApiClient(configuration) as api_client: - api_instance = AgentApi(api_client) - await copy_agent_artifacts_into_temp_folder(api_instance, task_id) - # add custom python - data = CHALLENGES[task_informations[task_id]["eval_id"]] - - artifact_path = str(Path(data["path"]).parent) - copy_artifacts_into_temp_folder( - TEMP_FOLDER_ABS_PATH, "custom_python", artifact_path - ) - json_file = CHALLENGES[task_informations[task_id]["eval_id"]]["path"] - json_files = deque() - - _, challenge_class = create_challenge(data, json_file, json_files) - challenge_instance = challenge_class() - scores = challenge_instance.get_scores(config={}) - test_name = "Test" + data["name"] - is_score_100 = 1 in scores["values"] - - info_details = { - "repository_info": { - "repo_url": None, - "team_name": None, - "benchmark_git_commit_sha": None, - "agent_git_commit_sha": None, - }, - "run_details": { - "run_id": None, - "command": "agbenchmark" + " --test=" + test_name, - "completion_time": None, - "benchmark_start_time": task_informations[task_id][ - "benchmark_start_time" - ], - "test_name": data["name"], - }, - "task_info": { - "data_path": data["path"].split("benchmark/", 1)[-1], - "is_regression": None, - "category": data["category"], - "task": data["task"], - "answer": data["ground"]["answer"], - "description": data["info"]["description"], - }, - "metrics": { - "difficulty": None, - "success": is_score_100, - "attempted": True, - "success_percentage": None, - "cost": None, - "run_time": None, - }, - "reached_cutoff": None, - "config": {}, - } - - BenchmarkRun.parse_obj(info_details) - - print(json.dumps(info_details, indent=4)) - return Response( - content=json.dumps(info_details), - status_code=200, - media_type="application/json", - ) - except ApiException as e: - print(f"Error whilst trying to evaluate the task: {task_id}") - return Response( - content=json.dumps({"error": "Internal server error"}), - status_code=500, - media_type="application/json", - ) - # path = Path(json_file).resolve() + logger.debug(f"Returning evaluation data:\n{eval_info.json(indent=4)}") + return eval_info + except ApiException as e: + logger.error(f"Error {e} whilst trying to evaluate task: {task_id}") + raise HTTPException(500) + app.include_router(router, prefix="/ap/v1") -app.include_router(router, prefix="/ap/v1") + return app |