aboutsummaryrefslogtreecommitdiff
path: root/autogpt/commands/file_operations.py
diff options
context:
space:
mode:
Diffstat (limited to 'autogpt/commands/file_operations.py')
-rw-r--r--autogpt/commands/file_operations.py340
1 files changed, 0 insertions, 340 deletions
diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py
deleted file mode 100644
index 0a06da318..000000000
--- a/autogpt/commands/file_operations.py
+++ /dev/null
@@ -1,340 +0,0 @@
-"""File operations for AutoGPT"""
-from __future__ import annotations
-
-import contextlib
-import hashlib
-import os
-import os.path
-from pathlib import Path
-from typing import Generator, Literal
-
-from autogpt.agent.agent import Agent
-from autogpt.command_decorator import command
-from autogpt.logs import logger
-from autogpt.memory.vector import MemoryItem, VectorMemory
-
-from .decorators import sanitize_path_arg
-from .file_operations_utils import read_textual_file
-
-Operation = Literal["write", "append", "delete"]
-
-
-def text_checksum(text: str) -> str:
- """Get the hex checksum for the given text."""
- return hashlib.md5(text.encode("utf-8")).hexdigest()
-
-
-def operations_from_log(
- log_path: str,
-) -> Generator[tuple[Operation, str, str | None], None, None]:
- """Parse the file operations log and return a tuple containing the log entries"""
- try:
- log = open(log_path, "r", encoding="utf-8")
- except FileNotFoundError:
- return
-
- for line in log:
- line = line.replace("File Operation Logger", "").strip()
- if not line:
- continue
- operation, tail = line.split(": ", maxsplit=1)
- operation = operation.strip()
- if operation in ("write", "append"):
- try:
- path, checksum = (x.strip() for x in tail.rsplit(" #", maxsplit=1))
- except ValueError:
- logger.warn(f"File log entry lacks checksum: '{line}'")
- path, checksum = tail.strip(), None
- yield (operation, path, checksum)
- elif operation == "delete":
- yield (operation, tail.strip(), None)
-
- log.close()
-
-
-def file_operations_state(log_path: str) -> dict[str, str]:
- """Iterates over the operations log and returns the expected state.
-
- Parses a log file at config.file_logger_path to construct a dictionary that maps
- each file path written or appended to its checksum. Deleted files are removed
- from the dictionary.
-
- Returns:
- A dictionary mapping file paths to their checksums.
-
- Raises:
- FileNotFoundError: If config.file_logger_path is not found.
- ValueError: If the log file content is not in the expected format.
- """
- state = {}
- for operation, path, checksum in operations_from_log(log_path):
- if operation in ("write", "append"):
- state[path] = checksum
- elif operation == "delete":
- del state[path]
- return state
-
-
-@sanitize_path_arg("filename")
-def is_duplicate_operation(
- operation: Operation, filename: str, agent: Agent, checksum: str | None = None
-) -> bool:
- """Check if the operation has already been performed
-
- Args:
- operation: The operation to check for
- filename: The name of the file to check for
- agent: The agent
- checksum: The checksum of the contents to be written
-
- Returns:
- True if the operation has already been performed on the file
- """
- # Make the filename into a relative path if possible
- with contextlib.suppress(ValueError):
- filename = str(Path(filename).relative_to(agent.workspace.root))
-
- state = file_operations_state(agent.config.file_logger_path)
- if operation == "delete" and filename not in state:
- return True
- if operation == "write" and state.get(filename) == checksum:
- return True
- return False
-
-
-@sanitize_path_arg("filename")
-def log_operation(
- operation: Operation, filename: str, agent: Agent, checksum: str | None = None
-) -> None:
- """Log the file operation to the file_logger.txt
-
- Args:
- operation: The operation to log
- filename: The name of the file the operation was performed on
- checksum: The checksum of the contents to be written
- """
- # Make the filename into a relative path if possible
- with contextlib.suppress(ValueError):
- filename = str(Path(filename).relative_to(agent.workspace.root))
-
- log_entry = f"{operation}: {filename}"
- if checksum is not None:
- log_entry += f" #{checksum}"
- logger.debug(f"Logging file operation: {log_entry}")
- append_to_file(
- agent.config.file_logger_path, f"{log_entry}\n", agent, should_log=False
- )
-
-
-@command(
- "read_file",
- "Read an existing file",
- {
- "filename": {
- "type": "string",
- "description": "The path of the file to read",
- "required": True,
- }
- },
-)
-@sanitize_path_arg("filename")
-def read_file(filename: str, agent: Agent) -> str:
- """Read a file and return the contents
-
- Args:
- filename (str): The name of the file to read
-
- Returns:
- str: The contents of the file
- """
- try:
- content = read_textual_file(filename, logger)
-
- # TODO: invalidate/update memory when file is edited
- file_memory = MemoryItem.from_text_file(content, filename, agent.config)
- if len(file_memory.chunks) > 1:
- return file_memory.summary
-
- return content
- except Exception as e:
- return f"Error: {str(e)}"
-
-
-def ingest_file(
- filename: str,
- memory: VectorMemory,
-) -> None:
- """
- Ingest a file by reading its content, splitting it into chunks with a specified
- maximum length and overlap, and adding the chunks to the memory storage.
-
- Args:
- filename: The name of the file to ingest
- memory: An object with an add() method to store the chunks in memory
- """
- try:
- logger.info(f"Ingesting file {filename}")
- content = read_file(filename)
-
- # TODO: differentiate between different types of files
- file_memory = MemoryItem.from_text_file(content, filename)
- logger.debug(f"Created memory: {file_memory.dump(True)}")
- memory.add(file_memory)
-
- logger.info(f"Ingested {len(file_memory.e_chunks)} chunks from {filename}")
- except Exception as err:
- logger.warn(f"Error while ingesting file '{filename}': {err}")
-
-
-@command(
- "write_to_file",
- "Writes to a file",
- {
- "filename": {
- "type": "string",
- "description": "The name of the file to write to",
- "required": True,
- },
- "text": {
- "type": "string",
- "description": "The text to write to the file",
- "required": True,
- },
- },
- aliases=["write_file", "create_file"],
-)
-@sanitize_path_arg("filename")
-def write_to_file(filename: str, text: str, agent: Agent) -> str:
- """Write text to a file
-
- Args:
- filename (str): The name of the file to write to
- text (str): The text to write to the file
-
- Returns:
- str: A message indicating success or failure
- """
- checksum = text_checksum(text)
- if is_duplicate_operation("write", filename, agent, checksum):
- return "Error: File has already been updated."
- try:
- directory = os.path.dirname(filename)
- os.makedirs(directory, exist_ok=True)
- with open(filename, "w", encoding="utf-8") as f:
- f.write(text)
- log_operation("write", filename, agent, checksum)
- return "File written to successfully."
- except Exception as err:
- return f"Error: {err}"
-
-
-@command(
- "append_to_file",
- "Appends to a file",
- {
- "filename": {
- "type": "string",
- "description": "The name of the file to write to",
- "required": True,
- },
- "text": {
- "type": "string",
- "description": "The text to write to the file",
- "required": True,
- },
- },
-)
-@sanitize_path_arg("filename")
-def append_to_file(
- filename: str, text: str, agent: Agent, should_log: bool = True
-) -> str:
- """Append text to a file
-
- Args:
- filename (str): The name of the file to append to
- text (str): The text to append to the file
- should_log (bool): Should log output
-
- Returns:
- str: A message indicating success or failure
- """
- try:
- directory = os.path.dirname(filename)
- os.makedirs(directory, exist_ok=True)
- with open(filename, "a", encoding="utf-8") as f:
- f.write(text)
-
- if should_log:
- with open(filename, "r", encoding="utf-8") as f:
- checksum = text_checksum(f.read())
- log_operation("append", filename, agent, checksum=checksum)
-
- return "Text appended successfully."
- except Exception as err:
- return f"Error: {err}"
-
-
-@command(
- "delete_file",
- "Deletes a file",
- {
- "filename": {
- "type": "string",
- "description": "The name of the file to delete",
- "required": True,
- }
- },
-)
-@sanitize_path_arg("filename")
-def delete_file(filename: str, agent: Agent) -> str:
- """Delete a file
-
- Args:
- filename (str): The name of the file to delete
-
- Returns:
- str: A message indicating success or failure
- """
- if is_duplicate_operation("delete", filename, agent):
- return "Error: File has already been deleted."
- try:
- os.remove(filename)
- log_operation("delete", filename, agent)
- return "File deleted successfully."
- except Exception as err:
- return f"Error: {err}"
-
-
-@command(
- "list_files",
- "Lists Files in a Directory",
- {
- "directory": {
- "type": "string",
- "description": "The directory to list files in",
- "required": True,
- }
- },
-)
-@sanitize_path_arg("directory")
-def list_files(directory: str, agent: Agent) -> list[str]:
- """lists files in a directory recursively
-
- Args:
- directory (str): The directory to search in
-
- Returns:
- list[str]: A list of files found in the directory
- """
- found_files = []
-
- for root, _, files in os.walk(directory):
- for file in files:
- if file.startswith("."):
- continue
- relative_path = os.path.relpath(
- os.path.join(root, file), agent.config.workspace_path
- )
- found_files.append(relative_path)
-
- return found_files