1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
import copy
import json
import os
import sys
import time
from datetime import datetime, timezone
from agbenchmark.reports.processing.graphs import save_single_radar_chart
from agbenchmark.reports.processing.process_report import get_agent_category
from agbenchmark.reports.processing.report_types import Report
from agbenchmark.utils.data_types import AgentBenchmarkConfig
from agbenchmark.utils.utils import get_highest_success_difficulty
class SingletonReportManager:
instance = None
def __new__(cls):
from agbenchmark.reports.agent_benchmark_config import (
get_agent_benchmark_config,
)
if not cls.instance:
cls.instance = super(SingletonReportManager, cls).__new__(cls)
agent_benchmark_config = get_agent_benchmark_config()
benchmark_start_time_dt = datetime.now(
timezone.utc
) # or any logic to fetch the datetime
# Make the Managers class attributes
cls.REGRESSION_MANAGER = ReportManager(
agent_benchmark_config.get_regression_reports_path(),
benchmark_start_time_dt,
)
cls.INFO_MANAGER = ReportManager(
str(
agent_benchmark_config.get_reports_path(benchmark_start_time_dt)
/ "report.json"
),
benchmark_start_time_dt,
)
cls.INTERNAL_INFO_MANAGER = ReportManager(
agent_benchmark_config.get_success_rate_path(), benchmark_start_time_dt
)
return cls.instance
@classmethod
def clear_instance(cls):
cls.instance = None
cls.REGRESSION_MANAGER = None
cls.INFO_MANAGER = None
cls.INTERNAL_INFO_MANAGER = None
class ReportManager:
"""Abstracts interaction with the regression tests file"""
def __init__(self, filename: str, benchmark_start_time: str):
self.filename = filename
self.start_time = time.time()
self.benchmark_start_time = benchmark_start_time
self.load()
def load(self) -> None:
if not os.path.exists(self.filename):
os.makedirs(os.path.dirname(self.filename), exist_ok=True)
with open(self.filename, "w") as f:
pass
try:
with open(self.filename, "r") as f:
file_content = (
f.read().strip()
) # read the content and remove any leading/trailing whitespace
if file_content: # if file is not empty, load the json
data = json.loads(file_content)
self.tests = {k: data[k] for k in sorted(data)}
else: # if file is empty, assign an empty dictionary
self.tests = {}
except FileNotFoundError:
self.tests = {}
except json.decoder.JSONDecodeError: # If JSON is invalid
self.tests = {}
self.save()
def save(self) -> None:
with open(self.filename, "w") as f:
json.dump(self.tests, f, indent=4)
def add_test(self, test_name: str, test_details: dict | list) -> None:
if test_name.startswith("Test"):
test_name = test_name[4:]
self.tests[test_name] = test_details
self.save()
def remove_test(self, test_name: str) -> None:
if test_name in self.tests:
del self.tests[test_name]
self.save()
def reset(self) -> None:
self.tests = {}
self.save()
def end_info_report(self, config: AgentBenchmarkConfig) -> None:
command = " ".join(sys.argv)
self.tests = {
"command": command.split(os.sep)[-1],
"benchmark_git_commit_sha": "---",
"agent_git_commit_sha": "---",
"completion_time": datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
),
"benchmark_start_time": self.benchmark_start_time.strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
),
"metrics": {
"run_time": str(round(time.time() - self.start_time, 2)) + " seconds",
"highest_difficulty": get_highest_success_difficulty(self.tests),
"total_cost": self.get_total_costs(),
},
"tests": copy.copy(self.tests),
"config": {
k: v for k, v in json.loads(config.json()).items() if v is not None
},
}
Report.parse_obj(self.tests)
converted_data = Report.parse_obj(self.tests)
agent_categories = get_agent_category(converted_data)
if len(agent_categories) > 1:
save_single_radar_chart(
agent_categories,
config.get_reports_path(self.benchmark_start_time) / "radar_chart.png",
)
self.save()
def get_total_costs(self):
total_cost = 0
all_costs_none = True
for test_name, test_data in self.tests.items():
cost = test_data["metrics"].get(
"cost", 0
) # gets the cost or defaults to 0 if cost is missing
if cost is not None: # check if cost is not None
all_costs_none = False
total_cost += cost # add cost to total
if all_costs_none:
total_cost = None
return total_cost
|