aboutsummaryrefslogtreecommitdiff
path: root/benchmark/agbenchmark/reports/ReportManager.py
diff options
context:
space:
mode:
Diffstat (limited to 'benchmark/agbenchmark/reports/ReportManager.py')
-rw-r--r--benchmark/agbenchmark/reports/ReportManager.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/benchmark/agbenchmark/reports/ReportManager.py b/benchmark/agbenchmark/reports/ReportManager.py
new file mode 100644
index 000000000..d04beee43
--- /dev/null
+++ b/benchmark/agbenchmark/reports/ReportManager.py
@@ -0,0 +1,213 @@
+import copy
+import json
+import logging
+import os
+import sys
+import time
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+from agbenchmark.config import AgentBenchmarkConfig
+from agbenchmark.reports.processing.graphs import save_single_radar_chart
+from agbenchmark.reports.processing.process_report import (
+ get_highest_achieved_difficulty_per_category,
+)
+from agbenchmark.reports.processing.report_types import MetricsOverall, Report, Test
+from agbenchmark.utils.utils import get_highest_success_difficulty
+
+logger = logging.getLogger(__name__)
+
+
+class SingletonReportManager:
+ instance = None
+
+ INFO_MANAGER: "SessionReportManager"
+ REGRESSION_MANAGER: "RegressionTestsTracker"
+ SUCCESS_RATE_TRACKER: "SuccessRatesTracker"
+
+ def __new__(cls):
+ if not cls.instance:
+ cls.instance = super(SingletonReportManager, cls).__new__(cls)
+
+ agent_benchmark_config = AgentBenchmarkConfig.load()
+ benchmark_start_time_dt = datetime.now(
+ timezone.utc
+ ) # or any logic to fetch the datetime
+
+ # Make the Managers class attributes
+ cls.INFO_MANAGER = SessionReportManager(
+ agent_benchmark_config.get_report_dir(benchmark_start_time_dt)
+ / "report.json",
+ benchmark_start_time_dt,
+ )
+ cls.REGRESSION_MANAGER = RegressionTestsTracker(
+ agent_benchmark_config.regression_tests_file
+ )
+ cls.SUCCESS_RATE_TRACKER = SuccessRatesTracker(
+ agent_benchmark_config.success_rate_file
+ )
+
+ return cls.instance
+
+ @classmethod
+ def clear_instance(cls):
+ cls.instance = None
+ cls.INFO_MANAGER = None
+ cls.REGRESSION_MANAGER = None
+ cls.SUCCESS_RATE_TRACKER = None
+
+
+class BaseReportManager:
+ """Abstracts interaction with the regression tests file"""
+
+ tests: dict[str, Any]
+
+ def __init__(self, report_file: Path):
+ self.report_file = report_file
+
+ self.load()
+
+ def load(self) -> None:
+ if not self.report_file.exists():
+ self.report_file.parent.mkdir(exist_ok=True)
+
+ try:
+ with self.report_file.open("r") as f:
+ data = json.load(f)
+ self.tests = {k: data[k] for k in sorted(data)}
+ except FileNotFoundError:
+ self.tests = {}
+ except json.decoder.JSONDecodeError as e:
+ logger.warning(f"Could not parse {self.report_file}: {e}")
+ self.tests = {}
+
+ def save(self) -> None:
+ with self.report_file.open("w") as f:
+ json.dump(self.tests, f, indent=4)
+
+ def remove_test(self, test_name: str) -> None:
+ if test_name in self.tests:
+ del self.tests[test_name]
+ self.save()
+
+ def reset(self) -> None:
+ self.tests = {}
+ self.save()
+
+
+class SessionReportManager(BaseReportManager):
+ """Abstracts interaction with the regression tests file"""
+
+ tests: dict[str, Test] | Report
+
+ def __init__(self, report_file: Path, benchmark_start_time: datetime):
+ super().__init__(report_file)
+
+ self.start_time = time.time()
+ self.benchmark_start_time = benchmark_start_time
+
+ def save(self) -> None:
+ with self.report_file.open("w") as f:
+ if isinstance(self.tests, Report):
+ f.write(self.tests.json(indent=4))
+ else:
+ json.dump({k: v.dict() for k, v in self.tests.items()}, f, indent=4)
+
+ def load(self) -> None:
+ super().load()
+ if "tests" in self.tests: # type: ignore
+ self.tests = Report.parse_obj(self.tests)
+ else:
+ self.tests = {n: Test.parse_obj(d) for n, d in self.tests.items()}
+
+ def add_test_report(self, test_name: str, test_report: Test) -> None:
+ if isinstance(self.tests, Report):
+ raise RuntimeError("Session report already finalized")
+
+ if test_name.startswith("Test"):
+ test_name = test_name[4:]
+ self.tests[test_name] = test_report
+
+ self.save()
+
+ def finalize_session_report(self, config: AgentBenchmarkConfig) -> None:
+ command = " ".join(sys.argv)
+
+ if isinstance(self.tests, Report):
+ raise RuntimeError("Session report already finalized")
+
+ self.tests = Report(
+ command=command.split(os.sep)[-1],
+ benchmark_git_commit_sha="---",
+ agent_git_commit_sha="---",
+ completion_time=datetime.now(timezone.utc).strftime(
+ "%Y-%m-%dT%H:%M:%S+00:00"
+ ),
+ benchmark_start_time=self.benchmark_start_time.strftime(
+ "%Y-%m-%dT%H:%M:%S+00:00"
+ ),
+ metrics=MetricsOverall(
+ run_time=str(round(time.time() - self.start_time, 2)) + " seconds",
+ highest_difficulty=get_highest_success_difficulty(self.tests),
+ total_cost=self.get_total_costs(),
+ ),
+ tests=copy.copy(self.tests),
+ config=config.dict(exclude_none=True),
+ )
+
+ agent_categories = get_highest_achieved_difficulty_per_category(self.tests)
+ if len(agent_categories) > 1:
+ save_single_radar_chart(
+ agent_categories,
+ config.get_report_dir(self.benchmark_start_time) / "radar_chart.png",
+ )
+
+ self.save()
+
+ def get_total_costs(self):
+ if isinstance(self.tests, Report):
+ tests = self.tests.tests
+ else:
+ tests = self.tests
+
+ total_cost = 0
+ all_costs_none = True
+ for test_data in tests.values():
+ cost = sum(r.cost or 0 for r in test_data.results)
+
+ if cost is not None: # check if cost is not None
+ all_costs_none = False
+ total_cost += cost # add cost to total
+ if all_costs_none:
+ total_cost = None
+ return total_cost
+
+
+class RegressionTestsTracker(BaseReportManager):
+ """Abstracts interaction with the regression tests file"""
+
+ tests: dict[str, dict]
+
+ def add_test(self, test_name: str, test_details: dict) -> None:
+ if test_name.startswith("Test"):
+ test_name = test_name[4:]
+
+ self.tests[test_name] = test_details
+ self.save()
+
+ def has_regression_test(self, test_name: str) -> bool:
+ return self.tests.get(test_name) is not None
+
+
+class SuccessRatesTracker(BaseReportManager):
+ """Abstracts interaction with the regression tests file"""
+
+ tests: dict[str, list[bool | None]]
+
+ def update(self, test_name: str, success_history: list[bool | None]) -> None:
+ if test_name.startswith("Test"):
+ test_name = test_name[4:]
+
+ self.tests[test_name] = success_history
+ self.save()