diff options
Diffstat (limited to 'autogpts/autogpt/autogpt/app/agent_protocol_server.py')
-rw-r--r-- | autogpts/autogpt/autogpt/app/agent_protocol_server.py | 181 |
1 files changed, 130 insertions, 51 deletions
diff --git a/autogpts/autogpt/autogpt/app/agent_protocol_server.py b/autogpts/autogpt/autogpt/app/agent_protocol_server.py index d59b79628..12e54582b 100644 --- a/autogpts/autogpt/autogpt/app/agent_protocol_server.py +++ b/autogpts/autogpt/autogpt/app/agent_protocol_server.py @@ -12,8 +12,7 @@ from fastapi.staticfiles import StaticFiles from forge.sdk.db import AgentDB from forge.sdk.errors import NotFoundError from forge.sdk.middlewares import AgentMiddleware -from forge.sdk.routes.agent_protocol import base_router -from forge.sdk.schema import ( +from forge.sdk.model import ( Artifact, Step, StepRequestBody, @@ -23,6 +22,7 @@ from forge.sdk.schema import ( TaskRequestBody, TaskStepsListResponse, ) +from forge.sdk.routes.agent_protocol import base_router from hypercorn.asyncio import serve as hypercorn_serve from hypercorn.config import Config as HypercornConfig @@ -33,13 +33,22 @@ from autogpt.commands.system import finish from autogpt.commands.user_interaction import ask_user from autogpt.config import Config from autogpt.core.resource.model_providers import ChatModelProvider -from autogpt.file_workspace import FileWorkspace +from autogpt.core.resource.model_providers.openai import OpenAIProvider +from autogpt.core.resource.model_providers.schema import ModelProviderBudget +from autogpt.file_workspace import ( + FileWorkspace, + FileWorkspaceBackendName, + get_workspace, +) +from autogpt.logs.utils import fmt_kwargs from autogpt.models.action_history import ActionErrorResult, ActionSuccessResult logger = logging.getLogger(__name__) class AgentProtocolServer: + _task_budgets: dict[str, ModelProviderBudget] + def __init__( self, app_config: Config, @@ -50,6 +59,7 @@ class AgentProtocolServer: self.db = database self.llm_provider = llm_provider self.agent_manager = AgentManager(app_data_dir=app_config.app_data_dir) + self._task_budgets = {} async def start(self, port: int = 8000, router: APIRouter = base_router): """Start the agent server.""" @@ -58,18 +68,14 @@ class AgentProtocolServer: config.bind = [f"localhost:{port}"] app = FastAPI( title="AutoGPT Server", - description="Forked from AutoGPT Forge; Modified version of The Agent Protocol.", + description="Forked from AutoGPT Forge; " + "Modified version of The Agent Protocol.", version="v0.4", ) # Add CORS middleware origins = [ - "http://localhost:5000", - "http://127.0.0.1:5000", - "http://localhost:8000", - "http://127.0.0.1:8000", - "http://localhost:8080", - "http://127.0.0.1:8080", + "*", # Add any other origins you want to whitelist ] @@ -98,7 +104,8 @@ class AgentProtocolServer: else: logger.warning( - f"Frontend not found. {frontend_path} does not exist. The frontend will not be available." + f"Frontend not found. {frontend_path} does not exist. " + "The frontend will not be available." ) # Used to access the methods on this class from API route handlers @@ -114,20 +121,23 @@ class AgentProtocolServer: """ Create a task for the agent. """ - logger.debug(f"Creating agent for task: '{task_request.input}'") - task_agent = await generate_agent_for_task( - task=task_request.input, - app_config=self.app_config, - llm_provider=self.llm_provider, - ) task = await self.db.create_task( input=task_request.input, additional_input=task_request.additional_input, ) + logger.debug(f"Creating agent for task: '{task.input}'") + task_agent = await generate_agent_for_task( + task=task.input, + app_config=self.app_config, + llm_provider=self._get_task_llm_provider(task), + ) + + # Assign an ID and a folder to the Agent and persist it agent_id = task_agent.state.agent_id = task_agent_id(task.task_id) logger.debug(f"New agent ID: {agent_id}") task_agent.attach_fs(self.app_config.app_data_dir / "agents" / agent_id) task_agent.state.save_to_json_file(task_agent.file_manager.state_file_path) + return task async def list_tasks(self, page: int = 1, pageSize: int = 10) -> TaskListResponse: @@ -139,7 +149,7 @@ class AgentProtocolServer: response = TaskListResponse(tasks=tasks, pagination=pagination) return response - async def get_task(self, task_id: int) -> Task: + async def get_task(self, task_id: str) -> Task: """ Get a task by ID. """ @@ -163,15 +173,11 @@ class AgentProtocolServer: logger.debug(f"Creating a step for task with ID: {task_id}...") # Restore Agent instance + task = await self.get_task(task_id) agent = configure_agent_with_state( state=self.agent_manager.retrieve_state(task_agent_id(task_id)), app_config=self.app_config, - llm_provider=self.llm_provider, - ) - agent.workspace.on_write_file = lambda path: self.db.create_artifact( - task_id=task_id, - file_name=path.parts[-1], - relative_path=str(path), + llm_provider=self._get_task_llm_provider(task), ) # According to the Agent Protocol spec, the first execute_step request contains @@ -209,10 +215,14 @@ class AgentProtocolServer: input=step_request, is_last=execute_command == finish.__name__ and execute_approved, ) + agent.llm_provider = self._get_task_llm_provider(task, step.step_id) # Execute previously proposed action if execute_command: assert execute_command_args is not None + agent.workspace.on_write_file = lambda path: self._on_agent_write_file( + task=task, step=step, relative_path=path + ) if step.is_last and execute_command == finish.__name__: assert execute_command_args @@ -221,6 +231,10 @@ class AgentProtocolServer: step_id=step.step_id, output=execute_command_args["reason"], ) + logger.info( + f"Total LLM cost for task {task_id}: " + f"${round(agent.llm_provider.get_incurred_cost(), 2)}" + ) return step if execute_command == ask_user.__name__: # HACK @@ -263,17 +277,17 @@ class AgentProtocolServer: # Format step output output = ( ( - f"Command `{execute_command}({fmt_kwargs(execute_command_args)})` returned:" + f"`{execute_command}({fmt_kwargs(execute_command_args)})` returned:" + ("\n\n" if "\n" in str(execute_result) else " ") + f"{execute_result}\n\n" ) - if execute_command_args and execute_command != "ask_user" + if execute_command_args and execute_command != ask_user.__name__ else "" ) output += f"{raw_output['thoughts']['speak']}\n\n" output += ( f"Next Command: {next_command}({fmt_kwargs(next_command_args)})" - if next_command != "ask_user" + if next_command != ask_user.__name__ else next_command_args["question"] ) @@ -307,9 +321,37 @@ class AgentProtocolServer: additional_output=additional_output, ) + logger.debug( + f"Running total LLM cost for task {task_id}: " + f"${round(agent.llm_provider.get_incurred_cost(), 3)}" + ) agent.state.save_to_json_file(agent.file_manager.state_file_path) return step + async def _on_agent_write_file( + self, task: Task, step: Step, relative_path: pathlib.Path + ) -> None: + """ + Creates an Artifact for the written file, or updates the Artifact if it exists. + """ + if relative_path.is_absolute(): + raise ValueError(f"File path '{relative_path}' is not relative") + for a in task.artifacts or []: + if a.relative_path == str(relative_path): + logger.debug(f"Updating Artifact after writing to existing file: {a}") + if not a.agent_created: + await self.db.update_artifact(a.artifact_id, agent_created=True) + break + else: + logger.debug(f"Creating Artifact for new file '{relative_path}'") + await self.db.create_artifact( + task_id=step.task_id, + step_id=step.step_id, + file_name=relative_path.parts[-1], + agent_created=True, + relative_path=str(relative_path), + ) + async def get_step(self, task_id: str, step_id: str) -> Step: """ Get a step by ID. @@ -332,7 +374,6 @@ class AgentProtocolServer: """ Create an artifact for the task. """ - data = None file_name = file.filename or str(uuid4()) data = b"" while contents := file.file.read(1024 * 1024): @@ -343,7 +384,7 @@ class AgentProtocolServer: else: file_path = os.path.join(relative_path, file_name) - workspace = get_task_agent_file_workspace(task_id, self.agent_manager) + workspace = self._get_task_agent_file_workspace(task_id, self.agent_manager) await workspace.write_file(file_path, data) artifact = await self.db.create_artifact( @@ -354,9 +395,9 @@ class AgentProtocolServer: ) return artifact - async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact: + async def get_artifact(self, task_id: str, artifact_id: str) -> StreamingResponse: """ - Get an artifact by ID. + Download a task artifact by ID. """ try: artifact = await self.db.get_artifact(artifact_id) @@ -364,39 +405,77 @@ class AgentProtocolServer: file_path = os.path.join(artifact.relative_path, artifact.file_name) else: file_path = artifact.relative_path - workspace = get_task_agent_file_workspace(task_id, self.agent_manager) + workspace = self._get_task_agent_file_workspace(task_id, self.agent_manager) retrieved_artifact = workspace.read_file(file_path, binary=True) - except NotFoundError as e: + except NotFoundError: raise - except FileNotFoundError as e: + except FileNotFoundError: raise return StreamingResponse( BytesIO(retrieved_artifact), media_type="application/octet-stream", headers={ - "Content-Disposition": f"attachment; filename={artifact.file_name}" + "Content-Disposition": f'attachment; filename="{artifact.file_name}"' }, ) + def _get_task_agent_file_workspace( + self, + task_id: str | int, + agent_manager: AgentManager, + ) -> FileWorkspace: + use_local_ws = ( + self.app_config.workspace_backend == FileWorkspaceBackendName.LOCAL + ) + agent_id = task_agent_id(task_id) + workspace = get_workspace( + backend=self.app_config.workspace_backend, + id=agent_id if not use_local_ws else "", + root_path=agent_manager.get_agent_dir( + agent_id=agent_id, + must_exist=True, + ) + / "workspace" + if use_local_ws + else None, + ) + workspace.initialize() + return workspace -def task_agent_id(task_id: str | int) -> str: - return f"AutoGPT-{task_id}" + def _get_task_llm_provider( + self, task: Task, step_id: str = "" + ) -> ChatModelProvider: + """ + Configures the LLM provider with headers to link outgoing requests to the task. + """ + task_llm_budget = self._task_budgets.get( + task.task_id, self.llm_provider.default_settings.budget.copy(deep=True) + ) + task_llm_provider_config = self.llm_provider._configuration.copy(deep=True) + _extra_request_headers = task_llm_provider_config.extra_request_headers + _extra_request_headers["AP-TaskID"] = task.task_id + if step_id: + _extra_request_headers["AP-StepID"] = step_id + if task.additional_input and (user_id := task.additional_input.get("user_id")): + _extra_request_headers["AutoGPT-UserID"] = user_id + + task_llm_provider = None + if isinstance(self.llm_provider, OpenAIProvider): + settings = self.llm_provider._settings.copy() + settings.budget = task_llm_budget + settings.configuration = task_llm_provider_config # type: ignore + task_llm_provider = OpenAIProvider( + settings=settings, + logger=logger.getChild(f"Task-{task.task_id}_OpenAIProvider"), + ) -def get_task_agent_file_workspace( - task_id: str | int, - agent_manager: AgentManager, -) -> FileWorkspace: - return FileWorkspace( - root=agent_manager.get_agent_dir( - agent_id=task_agent_id(task_id), - must_exist=True, - ) - / "workspace", - restrict_to_root=True, - ) + if task_llm_provider and task_llm_provider._budget: + self._task_budgets[task.task_id] = task_llm_provider._budget + + return task_llm_provider or self.llm_provider -def fmt_kwargs(kwargs: dict) -> str: - return ", ".join(f"{n}={repr(v)}" for n, v in kwargs.items()) +def task_agent_id(task_id: str | int) -> str: + return f"AutoGPT-{task_id}" |