diff options
Diffstat (limited to 'autogpt/commands/file_operations.py')
-rw-r--r-- | autogpt/commands/file_operations.py | 340 |
1 files changed, 0 insertions, 340 deletions
diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py deleted file mode 100644 index 0a06da318..000000000 --- a/autogpt/commands/file_operations.py +++ /dev/null @@ -1,340 +0,0 @@ -"""File operations for AutoGPT""" -from __future__ import annotations - -import contextlib -import hashlib -import os -import os.path -from pathlib import Path -from typing import Generator, Literal - -from autogpt.agent.agent import Agent -from autogpt.command_decorator import command -from autogpt.logs import logger -from autogpt.memory.vector import MemoryItem, VectorMemory - -from .decorators import sanitize_path_arg -from .file_operations_utils import read_textual_file - -Operation = Literal["write", "append", "delete"] - - -def text_checksum(text: str) -> str: - """Get the hex checksum for the given text.""" - return hashlib.md5(text.encode("utf-8")).hexdigest() - - -def operations_from_log( - log_path: str, -) -> Generator[tuple[Operation, str, str | None], None, None]: - """Parse the file operations log and return a tuple containing the log entries""" - try: - log = open(log_path, "r", encoding="utf-8") - except FileNotFoundError: - return - - for line in log: - line = line.replace("File Operation Logger", "").strip() - if not line: - continue - operation, tail = line.split(": ", maxsplit=1) - operation = operation.strip() - if operation in ("write", "append"): - try: - path, checksum = (x.strip() for x in tail.rsplit(" #", maxsplit=1)) - except ValueError: - logger.warn(f"File log entry lacks checksum: '{line}'") - path, checksum = tail.strip(), None - yield (operation, path, checksum) - elif operation == "delete": - yield (operation, tail.strip(), None) - - log.close() - - -def file_operations_state(log_path: str) -> dict[str, str]: - """Iterates over the operations log and returns the expected state. - - Parses a log file at config.file_logger_path to construct a dictionary that maps - each file path written or appended to its checksum. Deleted files are removed - from the dictionary. - - Returns: - A dictionary mapping file paths to their checksums. - - Raises: - FileNotFoundError: If config.file_logger_path is not found. - ValueError: If the log file content is not in the expected format. - """ - state = {} - for operation, path, checksum in operations_from_log(log_path): - if operation in ("write", "append"): - state[path] = checksum - elif operation == "delete": - del state[path] - return state - - -@sanitize_path_arg("filename") -def is_duplicate_operation( - operation: Operation, filename: str, agent: Agent, checksum: str | None = None -) -> bool: - """Check if the operation has already been performed - - Args: - operation: The operation to check for - filename: The name of the file to check for - agent: The agent - checksum: The checksum of the contents to be written - - Returns: - True if the operation has already been performed on the file - """ - # Make the filename into a relative path if possible - with contextlib.suppress(ValueError): - filename = str(Path(filename).relative_to(agent.workspace.root)) - - state = file_operations_state(agent.config.file_logger_path) - if operation == "delete" and filename not in state: - return True - if operation == "write" and state.get(filename) == checksum: - return True - return False - - -@sanitize_path_arg("filename") -def log_operation( - operation: Operation, filename: str, agent: Agent, checksum: str | None = None -) -> None: - """Log the file operation to the file_logger.txt - - Args: - operation: The operation to log - filename: The name of the file the operation was performed on - checksum: The checksum of the contents to be written - """ - # Make the filename into a relative path if possible - with contextlib.suppress(ValueError): - filename = str(Path(filename).relative_to(agent.workspace.root)) - - log_entry = f"{operation}: {filename}" - if checksum is not None: - log_entry += f" #{checksum}" - logger.debug(f"Logging file operation: {log_entry}") - append_to_file( - agent.config.file_logger_path, f"{log_entry}\n", agent, should_log=False - ) - - -@command( - "read_file", - "Read an existing file", - { - "filename": { - "type": "string", - "description": "The path of the file to read", - "required": True, - } - }, -) -@sanitize_path_arg("filename") -def read_file(filename: str, agent: Agent) -> str: - """Read a file and return the contents - - Args: - filename (str): The name of the file to read - - Returns: - str: The contents of the file - """ - try: - content = read_textual_file(filename, logger) - - # TODO: invalidate/update memory when file is edited - file_memory = MemoryItem.from_text_file(content, filename, agent.config) - if len(file_memory.chunks) > 1: - return file_memory.summary - - return content - except Exception as e: - return f"Error: {str(e)}" - - -def ingest_file( - filename: str, - memory: VectorMemory, -) -> None: - """ - Ingest a file by reading its content, splitting it into chunks with a specified - maximum length and overlap, and adding the chunks to the memory storage. - - Args: - filename: The name of the file to ingest - memory: An object with an add() method to store the chunks in memory - """ - try: - logger.info(f"Ingesting file {filename}") - content = read_file(filename) - - # TODO: differentiate between different types of files - file_memory = MemoryItem.from_text_file(content, filename) - logger.debug(f"Created memory: {file_memory.dump(True)}") - memory.add(file_memory) - - logger.info(f"Ingested {len(file_memory.e_chunks)} chunks from {filename}") - except Exception as err: - logger.warn(f"Error while ingesting file '{filename}': {err}") - - -@command( - "write_to_file", - "Writes to a file", - { - "filename": { - "type": "string", - "description": "The name of the file to write to", - "required": True, - }, - "text": { - "type": "string", - "description": "The text to write to the file", - "required": True, - }, - }, - aliases=["write_file", "create_file"], -) -@sanitize_path_arg("filename") -def write_to_file(filename: str, text: str, agent: Agent) -> str: - """Write text to a file - - Args: - filename (str): The name of the file to write to - text (str): The text to write to the file - - Returns: - str: A message indicating success or failure - """ - checksum = text_checksum(text) - if is_duplicate_operation("write", filename, agent, checksum): - return "Error: File has already been updated." - try: - directory = os.path.dirname(filename) - os.makedirs(directory, exist_ok=True) - with open(filename, "w", encoding="utf-8") as f: - f.write(text) - log_operation("write", filename, agent, checksum) - return "File written to successfully." - except Exception as err: - return f"Error: {err}" - - -@command( - "append_to_file", - "Appends to a file", - { - "filename": { - "type": "string", - "description": "The name of the file to write to", - "required": True, - }, - "text": { - "type": "string", - "description": "The text to write to the file", - "required": True, - }, - }, -) -@sanitize_path_arg("filename") -def append_to_file( - filename: str, text: str, agent: Agent, should_log: bool = True -) -> str: - """Append text to a file - - Args: - filename (str): The name of the file to append to - text (str): The text to append to the file - should_log (bool): Should log output - - Returns: - str: A message indicating success or failure - """ - try: - directory = os.path.dirname(filename) - os.makedirs(directory, exist_ok=True) - with open(filename, "a", encoding="utf-8") as f: - f.write(text) - - if should_log: - with open(filename, "r", encoding="utf-8") as f: - checksum = text_checksum(f.read()) - log_operation("append", filename, agent, checksum=checksum) - - return "Text appended successfully." - except Exception as err: - return f"Error: {err}" - - -@command( - "delete_file", - "Deletes a file", - { - "filename": { - "type": "string", - "description": "The name of the file to delete", - "required": True, - } - }, -) -@sanitize_path_arg("filename") -def delete_file(filename: str, agent: Agent) -> str: - """Delete a file - - Args: - filename (str): The name of the file to delete - - Returns: - str: A message indicating success or failure - """ - if is_duplicate_operation("delete", filename, agent): - return "Error: File has already been deleted." - try: - os.remove(filename) - log_operation("delete", filename, agent) - return "File deleted successfully." - except Exception as err: - return f"Error: {err}" - - -@command( - "list_files", - "Lists Files in a Directory", - { - "directory": { - "type": "string", - "description": "The directory to list files in", - "required": True, - } - }, -) -@sanitize_path_arg("directory") -def list_files(directory: str, agent: Agent) -> list[str]: - """lists files in a directory recursively - - Args: - directory (str): The directory to search in - - Returns: - list[str]: A list of files found in the directory - """ - found_files = [] - - for root, _, files in os.walk(directory): - for file in files: - if file.startswith("."): - continue - relative_path = os.path.relpath( - os.path.join(root, file), agent.config.workspace_path - ) - found_files.append(relative_path) - - return found_files |