aboutsummaryrefslogtreecommitdiff
path: root/autogpts/autogpt/tests/unit/test_file_operations.py
diff options
context:
space:
mode:
Diffstat (limited to 'autogpts/autogpt/tests/unit/test_file_operations.py')
-rw-r--r--autogpts/autogpt/tests/unit/test_file_operations.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/autogpts/autogpt/tests/unit/test_file_operations.py b/autogpts/autogpt/tests/unit/test_file_operations.py
new file mode 100644
index 000000000..21ebd0b32
--- /dev/null
+++ b/autogpts/autogpt/tests/unit/test_file_operations.py
@@ -0,0 +1,331 @@
+import hashlib
+import os
+import re
+from io import TextIOWrapper
+from pathlib import Path
+
+import pytest
+from pytest_mock import MockerFixture
+
+import autogpt.commands.file_operations as file_ops
+from autogpt.agents.agent import Agent
+from autogpt.agents.utils.exceptions import DuplicateOperationError
+from autogpt.config import Config
+from autogpt.file_workspace import FileWorkspace
+from autogpt.memory.vector.memory_item import MemoryItem
+from autogpt.memory.vector.utils import Embedding
+
+
+@pytest.fixture()
+def file_content():
+ return "This is a test file.\n"
+
+
+@pytest.fixture()
+def mock_MemoryItem_from_text(
+ mocker: MockerFixture, mock_embedding: Embedding, config: Config
+):
+ mocker.patch.object(
+ file_ops.MemoryItemFactory,
+ "from_text",
+ new=lambda content, source_type, config, metadata: MemoryItem(
+ raw_content=content,
+ summary=f"Summary of content '{content}'",
+ chunk_summaries=[f"Summary of content '{content}'"],
+ chunks=[content],
+ e_summary=mock_embedding,
+ e_chunks=[mock_embedding],
+ metadata=metadata | {"source_type": source_type},
+ ),
+ )
+
+
+@pytest.fixture()
+def test_file_name():
+ return Path("test_file.txt")
+
+
+@pytest.fixture
+def test_file_path(test_file_name: Path, workspace: FileWorkspace):
+ return workspace.get_path(test_file_name)
+
+
+@pytest.fixture()
+def test_file(test_file_path: Path):
+ file = open(test_file_path, "w")
+ yield file
+ if not file.closed:
+ file.close()
+
+
+@pytest.fixture()
+def test_file_with_content_path(test_file: TextIOWrapper, file_content, agent: Agent):
+ test_file.write(file_content)
+ test_file.close()
+ file_ops.log_operation(
+ "write", Path(test_file.name), agent, file_ops.text_checksum(file_content)
+ )
+ return Path(test_file.name)
+
+
+@pytest.fixture()
+def test_directory(workspace: FileWorkspace):
+ return workspace.get_path("test_directory")
+
+
+@pytest.fixture()
+def test_nested_file(workspace: FileWorkspace):
+ return workspace.get_path("nested/test_file.txt")
+
+
+def test_file_operations_log(test_file: TextIOWrapper):
+ log_file_content = (
+ "File Operation Logger\n"
+ "write: path/to/file1.txt #checksum1\n"
+ "write: path/to/file2.txt #checksum2\n"
+ "write: path/to/file3.txt #checksum3\n"
+ "append: path/to/file2.txt #checksum4\n"
+ "delete: path/to/file3.txt\n"
+ )
+ test_file.write(log_file_content)
+ test_file.close()
+
+ expected = [
+ ("write", "path/to/file1.txt", "checksum1"),
+ ("write", "path/to/file2.txt", "checksum2"),
+ ("write", "path/to/file3.txt", "checksum3"),
+ ("append", "path/to/file2.txt", "checksum4"),
+ ("delete", "path/to/file3.txt", None),
+ ]
+ assert list(file_ops.operations_from_log(test_file.name)) == expected
+
+
+def test_file_operations_state(test_file: TextIOWrapper):
+ # Prepare a fake log file
+ log_file_content = (
+ "File Operation Logger\n"
+ "write: path/to/file1.txt #checksum1\n"
+ "write: path/to/file2.txt #checksum2\n"
+ "write: path/to/file3.txt #checksum3\n"
+ "append: path/to/file2.txt #checksum4\n"
+ "delete: path/to/file3.txt\n"
+ )
+ test_file.write(log_file_content)
+ test_file.close()
+
+ # Call the function and check the returned dictionary
+ expected_state = {
+ "path/to/file1.txt": "checksum1",
+ "path/to/file2.txt": "checksum4",
+ }
+ assert file_ops.file_operations_state(test_file.name) == expected_state
+
+
+def test_is_duplicate_operation(agent: Agent, mocker: MockerFixture):
+ # Prepare a fake state dictionary for the function to use
+ state = {
+ "path/to/file1.txt": "checksum1",
+ "path/to/file2.txt": "checksum2",
+ }
+ mocker.patch.object(file_ops, "file_operations_state", lambda _: state)
+
+ # Test cases with write operations
+ assert (
+ file_ops.is_duplicate_operation(
+ "write", Path("path/to/file1.txt"), agent, "checksum1"
+ )
+ is True
+ )
+ assert (
+ file_ops.is_duplicate_operation(
+ "write", Path("path/to/file1.txt"), agent, "checksum2"
+ )
+ is False
+ )
+ assert (
+ file_ops.is_duplicate_operation(
+ "write", Path("path/to/file3.txt"), agent, "checksum3"
+ )
+ is False
+ )
+ # Test cases with append operations
+ assert (
+ file_ops.is_duplicate_operation(
+ "append", Path("path/to/file1.txt"), agent, "checksum1"
+ )
+ is False
+ )
+ # Test cases with delete operations
+ assert (
+ file_ops.is_duplicate_operation("delete", Path("path/to/file1.txt"), agent)
+ is False
+ )
+ assert (
+ file_ops.is_duplicate_operation("delete", Path("path/to/file3.txt"), agent)
+ is True
+ )
+
+
+# Test logging a file operation
+def test_log_operation(agent: Agent):
+ file_ops.log_operation("log_test", Path("path/to/test"), agent=agent)
+ with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
+ content = f.read()
+ assert "log_test: path/to/test\n" in content
+
+
+def test_text_checksum(file_content: str):
+ checksum = file_ops.text_checksum(file_content)
+ different_checksum = file_ops.text_checksum("other content")
+ assert re.match(r"^[a-fA-F0-9]+$", checksum) is not None
+ assert checksum != different_checksum
+
+
+def test_log_operation_with_checksum(agent: Agent):
+ file_ops.log_operation(
+ "log_test", Path("path/to/test"), agent=agent, checksum="ABCDEF"
+ )
+ with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
+ content = f.read()
+ assert "log_test: path/to/test #ABCDEF\n" in content
+
+
+def test_read_file(
+ mock_MemoryItem_from_text,
+ test_file_with_content_path: Path,
+ file_content,
+ agent: Agent,
+):
+ content = file_ops.read_file(test_file_with_content_path, agent=agent)
+ assert content.replace("\r", "") == file_content
+
+
+def test_read_file_not_found(agent: Agent):
+ filename = "does_not_exist.txt"
+ with pytest.raises(FileNotFoundError):
+ file_ops.read_file(filename, agent=agent)
+
+
+@pytest.mark.asyncio
+async def test_write_to_file_relative_path(test_file_name: Path, agent: Agent):
+ new_content = "This is new content.\n"
+ await file_ops.write_to_file(test_file_name, new_content, agent=agent)
+ with open(agent.workspace.get_path(test_file_name), "r", encoding="utf-8") as f:
+ content = f.read()
+ assert content == new_content
+
+
+@pytest.mark.asyncio
+async def test_write_to_file_absolute_path(test_file_path: Path, agent: Agent):
+ new_content = "This is new content.\n"
+ await file_ops.write_to_file(test_file_path, new_content, agent=agent)
+ with open(test_file_path, "r", encoding="utf-8") as f:
+ content = f.read()
+ assert content == new_content
+
+
+@pytest.mark.asyncio
+async def test_write_file_logs_checksum(test_file_name: Path, agent: Agent):
+ new_content = "This is new content.\n"
+ new_checksum = file_ops.text_checksum(new_content)
+ await file_ops.write_to_file(test_file_name, new_content, agent=agent)
+ with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
+ log_entry = f.read()
+ assert log_entry == f"write: {test_file_name} #{new_checksum}\n"
+
+
+@pytest.mark.asyncio
+async def test_write_file_fails_if_content_exists(test_file_name: Path, agent: Agent):
+ new_content = "This is new content.\n"
+ file_ops.log_operation(
+ "write",
+ test_file_name,
+ agent=agent,
+ checksum=file_ops.text_checksum(new_content),
+ )
+ with pytest.raises(DuplicateOperationError):
+ await file_ops.write_to_file(test_file_name, new_content, agent=agent)
+
+
+@pytest.mark.asyncio
+async def test_write_file_succeeds_if_content_different(
+ test_file_with_content_path: Path, agent: Agent
+):
+ new_content = "This is different content.\n"
+ await file_ops.write_to_file(test_file_with_content_path, new_content, agent=agent)
+
+
+@pytest.mark.asyncio
+async def test_append_to_file(test_nested_file: Path, agent: Agent):
+ append_text = "This is appended text.\n"
+ await file_ops.write_to_file(test_nested_file, append_text, agent=agent)
+
+ file_ops.append_to_file(test_nested_file, append_text, agent=agent)
+
+ with open(test_nested_file, "r") as f:
+ content_after = f.read()
+
+ assert content_after == append_text + append_text
+
+
+def test_append_to_file_uses_checksum_from_appended_file(
+ test_file_name: Path, agent: Agent
+):
+ append_text = "This is appended text.\n"
+ file_ops.append_to_file(
+ agent.workspace.get_path(test_file_name),
+ append_text,
+ agent=agent,
+ )
+ file_ops.append_to_file(
+ agent.workspace.get_path(test_file_name),
+ append_text,
+ agent=agent,
+ )
+ with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
+ log_contents = f.read()
+
+ digest = hashlib.md5()
+ digest.update(append_text.encode("utf-8"))
+ checksum1 = digest.hexdigest()
+ digest.update(append_text.encode("utf-8"))
+ checksum2 = digest.hexdigest()
+ assert log_contents == (
+ f"append: {test_file_name} #{checksum1}\n"
+ f"append: {test_file_name} #{checksum2}\n"
+ )
+
+
+def test_list_files(workspace: FileWorkspace, test_directory: Path, agent: Agent):
+ # Case 1: Create files A and B, search for A, and ensure we don't return A and B
+ file_a = workspace.get_path("file_a.txt")
+ file_b = workspace.get_path("file_b.txt")
+
+ with open(file_a, "w") as f:
+ f.write("This is file A.")
+
+ with open(file_b, "w") as f:
+ f.write("This is file B.")
+
+ # Create a subdirectory and place a copy of file_a in it
+ if not os.path.exists(test_directory):
+ os.makedirs(test_directory)
+
+ with open(os.path.join(test_directory, file_a.name), "w") as f:
+ f.write("This is file A in the subdirectory.")
+
+ files = file_ops.list_folder(str(workspace.root), agent=agent)
+ assert file_a.name in files
+ assert file_b.name in files
+ assert os.path.join(Path(test_directory).name, file_a.name) in files
+
+ # Clean up
+ os.remove(file_a)
+ os.remove(file_b)
+ os.remove(os.path.join(test_directory, file_a.name))
+ os.rmdir(test_directory)
+
+ # Case 2: Search for a file that does not exist and make sure we don't throw
+ non_existent_file = "non_existent_file.txt"
+ files = file_ops.list_folder("", agent=agent)
+ assert non_existent_file not in files