aboutsummaryrefslogtreecommitdiff
path: root/benchmark/agbenchmark/reports/ReportManager.py
blob: 8e56682e52b1a40748fdd406fcc4626a2bed3547 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import copy
import json
import os
import sys
import time
from datetime import datetime, timezone

from agbenchmark.reports.processing.graphs import save_single_radar_chart
from agbenchmark.reports.processing.process_report import get_agent_category
from agbenchmark.reports.processing.report_types import Report
from agbenchmark.utils.data_types import AgentBenchmarkConfig
from agbenchmark.utils.utils import get_highest_success_difficulty


class SingletonReportManager:
    instance = None

    def __new__(cls):
        from agbenchmark.reports.agent_benchmark_config import (
            get_agent_benchmark_config,
        )

        if not cls.instance:
            cls.instance = super(SingletonReportManager, cls).__new__(cls)

            agent_benchmark_config = get_agent_benchmark_config()
            benchmark_start_time_dt = datetime.now(
                timezone.utc
            )  # or any logic to fetch the datetime

            # Make the Managers class attributes
            cls.REGRESSION_MANAGER = ReportManager(
                agent_benchmark_config.get_regression_reports_path(),
                benchmark_start_time_dt,
            )
            cls.INFO_MANAGER = ReportManager(
                str(
                    agent_benchmark_config.get_reports_path(benchmark_start_time_dt)
                    / "report.json"
                ),
                benchmark_start_time_dt,
            )
            cls.INTERNAL_INFO_MANAGER = ReportManager(
                agent_benchmark_config.get_success_rate_path(), benchmark_start_time_dt
            )

        return cls.instance

    @classmethod
    def clear_instance(cls):
        cls.instance = None
        cls.REGRESSION_MANAGER = None
        cls.INFO_MANAGER = None
        cls.INTERNAL_INFO_MANAGER = None


class ReportManager:
    """Abstracts interaction with the regression tests file"""

    def __init__(self, filename: str, benchmark_start_time: str):
        self.filename = filename
        self.start_time = time.time()
        self.benchmark_start_time = benchmark_start_time

        self.load()

    def load(self) -> None:
        if not os.path.exists(self.filename):
            os.makedirs(os.path.dirname(self.filename), exist_ok=True)
            with open(self.filename, "w") as f:
                pass

        try:
            with open(self.filename, "r") as f:
                file_content = (
                    f.read().strip()
                )  # read the content and remove any leading/trailing whitespace
                if file_content:  # if file is not empty, load the json
                    data = json.loads(file_content)
                    self.tests = {k: data[k] for k in sorted(data)}
                else:  # if file is empty, assign an empty dictionary
                    self.tests = {}
        except FileNotFoundError:
            self.tests = {}
        except json.decoder.JSONDecodeError:  # If JSON is invalid
            self.tests = {}
        self.save()

    def save(self) -> None:
        with open(self.filename, "w") as f:
            json.dump(self.tests, f, indent=4)

    def add_test(self, test_name: str, test_details: dict | list) -> None:
        if test_name.startswith("Test"):
            test_name = test_name[4:]
        self.tests[test_name] = test_details

        self.save()

    def remove_test(self, test_name: str) -> None:
        if test_name in self.tests:
            del self.tests[test_name]
            self.save()

    def reset(self) -> None:
        self.tests = {}
        self.save()

    def end_info_report(self, config: AgentBenchmarkConfig) -> None:
        command = " ".join(sys.argv)

        self.tests = {
            "command": command.split(os.sep)[-1],
            "benchmark_git_commit_sha": "---",
            "agent_git_commit_sha": "---",
            "completion_time": datetime.now(timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%S+00:00"
            ),
            "benchmark_start_time": self.benchmark_start_time.strftime(
                "%Y-%m-%dT%H:%M:%S+00:00"
            ),
            "metrics": {
                "run_time": str(round(time.time() - self.start_time, 2)) + " seconds",
                "highest_difficulty": get_highest_success_difficulty(self.tests),
                "total_cost": self.get_total_costs(),
            },
            "tests": copy.copy(self.tests),
            "config": {
                k: v for k, v in json.loads(config.json()).items() if v is not None
            },
        }
        Report.parse_obj(self.tests)

        converted_data = Report.parse_obj(self.tests)

        agent_categories = get_agent_category(converted_data)
        if len(agent_categories) > 1:
            save_single_radar_chart(
                agent_categories,
                config.get_reports_path(self.benchmark_start_time) / "radar_chart.png",
            )

        self.save()

    def get_total_costs(self):
        total_cost = 0
        all_costs_none = True
        for test_name, test_data in self.tests.items():
            cost = test_data["metrics"].get(
                "cost", 0
            )  # gets the cost or defaults to 0 if cost is missing

            if cost is not None:  # check if cost is not None
                all_costs_none = False
                total_cost += cost  # add cost to total
        if all_costs_none:
            total_cost = None
        return total_cost