diff options
Diffstat (limited to 'autogpts/autogpt/autogpt/app/agent_protocol_server.py')
-rw-r--r-- | autogpts/autogpt/autogpt/app/agent_protocol_server.py | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/autogpts/autogpt/autogpt/app/agent_protocol_server.py b/autogpts/autogpt/autogpt/app/agent_protocol_server.py new file mode 100644 index 000000000..12e54582b --- /dev/null +++ b/autogpts/autogpt/autogpt/app/agent_protocol_server.py @@ -0,0 +1,481 @@ +import logging +import os +import pathlib +from io import BytesIO +from uuid import uuid4 + +import orjson +from fastapi import APIRouter, FastAPI, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import RedirectResponse, StreamingResponse +from fastapi.staticfiles import StaticFiles +from forge.sdk.db import AgentDB +from forge.sdk.errors import NotFoundError +from forge.sdk.middlewares import AgentMiddleware +from forge.sdk.model import ( + Artifact, + Step, + StepRequestBody, + Task, + TaskArtifactsListResponse, + TaskListResponse, + TaskRequestBody, + TaskStepsListResponse, +) +from forge.sdk.routes.agent_protocol import base_router +from hypercorn.asyncio import serve as hypercorn_serve +from hypercorn.config import Config as HypercornConfig + +from autogpt.agent_factory.configurators import configure_agent_with_state +from autogpt.agent_factory.generators import generate_agent_for_task +from autogpt.agent_manager import AgentManager +from autogpt.commands.system import finish +from autogpt.commands.user_interaction import ask_user +from autogpt.config import Config +from autogpt.core.resource.model_providers import ChatModelProvider +from autogpt.core.resource.model_providers.openai import OpenAIProvider +from autogpt.core.resource.model_providers.schema import ModelProviderBudget +from autogpt.file_workspace import ( + FileWorkspace, + FileWorkspaceBackendName, + get_workspace, +) +from autogpt.logs.utils import fmt_kwargs +from autogpt.models.action_history import ActionErrorResult, ActionSuccessResult + +logger = logging.getLogger(__name__) + + +class AgentProtocolServer: + _task_budgets: dict[str, ModelProviderBudget] + + def __init__( + self, + app_config: Config, + database: AgentDB, + llm_provider: ChatModelProvider, + ): + self.app_config = app_config + self.db = database + self.llm_provider = llm_provider + self.agent_manager = AgentManager(app_data_dir=app_config.app_data_dir) + self._task_budgets = {} + + async def start(self, port: int = 8000, router: APIRouter = base_router): + """Start the agent server.""" + logger.debug("Starting the agent server...") + config = HypercornConfig() + config.bind = [f"localhost:{port}"] + app = FastAPI( + title="AutoGPT Server", + description="Forked from AutoGPT Forge; " + "Modified version of The Agent Protocol.", + version="v0.4", + ) + + # Add CORS middleware + origins = [ + "*", + # Add any other origins you want to whitelist + ] + + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + app.include_router(router, prefix="/ap/v1") + script_dir = os.path.dirname(os.path.realpath(__file__)) + frontend_path = ( + pathlib.Path(script_dir) + .joinpath("../../../../frontend/build/web") + .resolve() + ) + + if os.path.exists(frontend_path): + app.mount("/app", StaticFiles(directory=frontend_path), name="app") + + @app.get("/", include_in_schema=False) + async def root(): + return RedirectResponse(url="/app/index.html", status_code=307) + + else: + logger.warning( + f"Frontend not found. {frontend_path} does not exist. " + "The frontend will not be available." + ) + + # Used to access the methods on this class from API route handlers + app.add_middleware(AgentMiddleware, agent=self) + + config.loglevel = "ERROR" + config.bind = [f"0.0.0.0:{port}"] + + logger.info(f"AutoGPT server starting on http://localhost:{port}") + await hypercorn_serve(app, config) + + async def create_task(self, task_request: TaskRequestBody) -> Task: + """ + Create a task for the agent. + """ + task = await self.db.create_task( + input=task_request.input, + additional_input=task_request.additional_input, + ) + logger.debug(f"Creating agent for task: '{task.input}'") + task_agent = await generate_agent_for_task( + task=task.input, + app_config=self.app_config, + llm_provider=self._get_task_llm_provider(task), + ) + + # Assign an ID and a folder to the Agent and persist it + agent_id = task_agent.state.agent_id = task_agent_id(task.task_id) + logger.debug(f"New agent ID: {agent_id}") + task_agent.attach_fs(self.app_config.app_data_dir / "agents" / agent_id) + task_agent.state.save_to_json_file(task_agent.file_manager.state_file_path) + + return task + + async def list_tasks(self, page: int = 1, pageSize: int = 10) -> TaskListResponse: + """ + List all tasks that the agent has created. + """ + logger.debug("Listing all tasks...") + tasks, pagination = await self.db.list_tasks(page, pageSize) + response = TaskListResponse(tasks=tasks, pagination=pagination) + return response + + async def get_task(self, task_id: str) -> Task: + """ + Get a task by ID. + """ + logger.debug(f"Getting task with ID: {task_id}...") + task = await self.db.get_task(task_id) + return task + + async def list_steps( + self, task_id: str, page: int = 1, pageSize: int = 10 + ) -> TaskStepsListResponse: + """ + List the IDs of all steps that the task has created. + """ + logger.debug(f"Listing all steps created by task with ID: {task_id}...") + steps, pagination = await self.db.list_steps(task_id, page, pageSize) + response = TaskStepsListResponse(steps=steps, pagination=pagination) + return response + + async def execute_step(self, task_id: str, step_request: StepRequestBody) -> Step: + """Create a step for the task.""" + logger.debug(f"Creating a step for task with ID: {task_id}...") + + # Restore Agent instance + task = await self.get_task(task_id) + agent = configure_agent_with_state( + state=self.agent_manager.retrieve_state(task_agent_id(task_id)), + app_config=self.app_config, + llm_provider=self._get_task_llm_provider(task), + ) + + # According to the Agent Protocol spec, the first execute_step request contains + # the same task input as the parent create_task request. + # To prevent this from interfering with the agent's process, we ignore the input + # of this first step request, and just generate the first step proposal. + is_init_step = not bool(agent.event_history) + execute_command, execute_command_args, execute_result = None, None, None + execute_approved = False + + # HACK: only for compatibility with AGBenchmark + if step_request.input == "y": + step_request.input = "" + + user_input = step_request.input if not is_init_step else "" + + if ( + not is_init_step + and agent.event_history.current_episode + and not agent.event_history.current_episode.result + ): + execute_command = agent.event_history.current_episode.action.name + execute_command_args = agent.event_history.current_episode.action.args + execute_approved = not user_input + + logger.debug( + f"Agent proposed command" + f" {execute_command}({fmt_kwargs(execute_command_args)})." + f" User input/feedback: {repr(user_input)}" + ) + + # Save step request + step = await self.db.create_step( + task_id=task_id, + input=step_request, + is_last=execute_command == finish.__name__ and execute_approved, + ) + agent.llm_provider = self._get_task_llm_provider(task, step.step_id) + + # Execute previously proposed action + if execute_command: + assert execute_command_args is not None + agent.workspace.on_write_file = lambda path: self._on_agent_write_file( + task=task, step=step, relative_path=path + ) + + if step.is_last and execute_command == finish.__name__: + assert execute_command_args + step = await self.db.update_step( + task_id=task_id, + step_id=step.step_id, + output=execute_command_args["reason"], + ) + logger.info( + f"Total LLM cost for task {task_id}: " + f"${round(agent.llm_provider.get_incurred_cost(), 2)}" + ) + return step + + if execute_command == ask_user.__name__: # HACK + execute_result = ActionSuccessResult(outputs=user_input) + agent.event_history.register_result(execute_result) + elif not execute_command: + execute_result = None + elif execute_approved: + step = await self.db.update_step( + task_id=task_id, + step_id=step.step_id, + status="running", + ) + # Execute previously proposed action + execute_result = await agent.execute( + command_name=execute_command, + command_args=execute_command_args, + ) + else: + assert user_input + execute_result = await agent.execute( + command_name="human_feedback", # HACK + command_args={}, + user_input=user_input, + ) + + # Propose next action + try: + next_command, next_command_args, raw_output = await agent.propose_action() + logger.debug(f"AI output: {raw_output}") + except Exception as e: + step = await self.db.update_step( + task_id=task_id, + step_id=step.step_id, + status="completed", + output=f"An error occurred while proposing the next action: {e}", + ) + return step + + # Format step output + output = ( + ( + f"`{execute_command}({fmt_kwargs(execute_command_args)})` returned:" + + ("\n\n" if "\n" in str(execute_result) else " ") + + f"{execute_result}\n\n" + ) + if execute_command_args and execute_command != ask_user.__name__ + else "" + ) + output += f"{raw_output['thoughts']['speak']}\n\n" + output += ( + f"Next Command: {next_command}({fmt_kwargs(next_command_args)})" + if next_command != ask_user.__name__ + else next_command_args["question"] + ) + + additional_output = { + **( + { + "last_action": { + "name": execute_command, + "args": execute_command_args, + "result": ( + orjson.loads(execute_result.json()) + if not isinstance(execute_result, ActionErrorResult) + else { + "error": str(execute_result.error), + "reason": execute_result.reason, + } + ), + }, + } + if not is_init_step + else {} + ), + **raw_output, + } + + step = await self.db.update_step( + task_id=task_id, + step_id=step.step_id, + status="completed", + output=output, + additional_output=additional_output, + ) + + logger.debug( + f"Running total LLM cost for task {task_id}: " + f"${round(agent.llm_provider.get_incurred_cost(), 3)}" + ) + agent.state.save_to_json_file(agent.file_manager.state_file_path) + return step + + async def _on_agent_write_file( + self, task: Task, step: Step, relative_path: pathlib.Path + ) -> None: + """ + Creates an Artifact for the written file, or updates the Artifact if it exists. + """ + if relative_path.is_absolute(): + raise ValueError(f"File path '{relative_path}' is not relative") + for a in task.artifacts or []: + if a.relative_path == str(relative_path): + logger.debug(f"Updating Artifact after writing to existing file: {a}") + if not a.agent_created: + await self.db.update_artifact(a.artifact_id, agent_created=True) + break + else: + logger.debug(f"Creating Artifact for new file '{relative_path}'") + await self.db.create_artifact( + task_id=step.task_id, + step_id=step.step_id, + file_name=relative_path.parts[-1], + agent_created=True, + relative_path=str(relative_path), + ) + + async def get_step(self, task_id: str, step_id: str) -> Step: + """ + Get a step by ID. + """ + step = await self.db.get_step(task_id, step_id) + return step + + async def list_artifacts( + self, task_id: str, page: int = 1, pageSize: int = 10 + ) -> TaskArtifactsListResponse: + """ + List the artifacts that the task has created. + """ + artifacts, pagination = await self.db.list_artifacts(task_id, page, pageSize) + return TaskArtifactsListResponse(artifacts=artifacts, pagination=pagination) + + async def create_artifact( + self, task_id: str, file: UploadFile, relative_path: str + ) -> Artifact: + """ + Create an artifact for the task. + """ + file_name = file.filename or str(uuid4()) + data = b"" + while contents := file.file.read(1024 * 1024): + data += contents + # Check if relative path ends with filename + if relative_path.endswith(file_name): + file_path = relative_path + else: + file_path = os.path.join(relative_path, file_name) + + workspace = self._get_task_agent_file_workspace(task_id, self.agent_manager) + await workspace.write_file(file_path, data) + + artifact = await self.db.create_artifact( + task_id=task_id, + file_name=file_name, + relative_path=relative_path, + agent_created=False, + ) + return artifact + + async def get_artifact(self, task_id: str, artifact_id: str) -> StreamingResponse: + """ + Download a task artifact by ID. + """ + try: + artifact = await self.db.get_artifact(artifact_id) + if artifact.file_name not in artifact.relative_path: + file_path = os.path.join(artifact.relative_path, artifact.file_name) + else: + file_path = artifact.relative_path + workspace = self._get_task_agent_file_workspace(task_id, self.agent_manager) + retrieved_artifact = workspace.read_file(file_path, binary=True) + except NotFoundError: + raise + except FileNotFoundError: + raise + + return StreamingResponse( + BytesIO(retrieved_artifact), + media_type="application/octet-stream", + headers={ + "Content-Disposition": f'attachment; filename="{artifact.file_name}"' + }, + ) + + def _get_task_agent_file_workspace( + self, + task_id: str | int, + agent_manager: AgentManager, + ) -> FileWorkspace: + use_local_ws = ( + self.app_config.workspace_backend == FileWorkspaceBackendName.LOCAL + ) + agent_id = task_agent_id(task_id) + workspace = get_workspace( + backend=self.app_config.workspace_backend, + id=agent_id if not use_local_ws else "", + root_path=agent_manager.get_agent_dir( + agent_id=agent_id, + must_exist=True, + ) + / "workspace" + if use_local_ws + else None, + ) + workspace.initialize() + return workspace + + def _get_task_llm_provider( + self, task: Task, step_id: str = "" + ) -> ChatModelProvider: + """ + Configures the LLM provider with headers to link outgoing requests to the task. + """ + task_llm_budget = self._task_budgets.get( + task.task_id, self.llm_provider.default_settings.budget.copy(deep=True) + ) + + task_llm_provider_config = self.llm_provider._configuration.copy(deep=True) + _extra_request_headers = task_llm_provider_config.extra_request_headers + _extra_request_headers["AP-TaskID"] = task.task_id + if step_id: + _extra_request_headers["AP-StepID"] = step_id + if task.additional_input and (user_id := task.additional_input.get("user_id")): + _extra_request_headers["AutoGPT-UserID"] = user_id + + task_llm_provider = None + if isinstance(self.llm_provider, OpenAIProvider): + settings = self.llm_provider._settings.copy() + settings.budget = task_llm_budget + settings.configuration = task_llm_provider_config # type: ignore + task_llm_provider = OpenAIProvider( + settings=settings, + logger=logger.getChild(f"Task-{task.task_id}_OpenAIProvider"), + ) + + if task_llm_provider and task_llm_provider._budget: + self._task_budgets[task.task_id] = task_llm_provider._budget + + return task_llm_provider or self.llm_provider + + +def task_agent_id(task_id: str | int) -> str: + return f"AutoGPT-{task_id}" |