aboutsummaryrefslogtreecommitdiff
path: root/autogpts/autogpt/tests/unit/test_s3_file_workspace.py
blob: 980d1e3b84e3c6505f12c9a92a098dab8e4c70dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import uuid
from pathlib import Path

import pytest
import pytest_asyncio
from botocore.exceptions import ClientError

from autogpt.file_workspace.s3 import S3FileWorkspace, S3FileWorkspaceConfiguration

if not os.getenv("S3_ENDPOINT_URL") and not os.getenv("AWS_ACCESS_KEY_ID"):
    pytest.skip("S3 environment variables are not set", allow_module_level=True)


@pytest.fixture
def s3_bucket_name() -> str:
    return f"test-bucket-{str(uuid.uuid4())[:8]}"


@pytest.fixture
def s3_workspace_uninitialized(s3_bucket_name: str) -> S3FileWorkspace:
    os.environ["WORKSPACE_STORAGE_BUCKET"] = s3_bucket_name
    ws_config = S3FileWorkspaceConfiguration.from_env()
    ws_config.root = Path("/workspaces/AutoGPT-some-unique-task-id")
    workspace = S3FileWorkspace(ws_config)
    yield workspace  # type: ignore
    del os.environ["WORKSPACE_STORAGE_BUCKET"]


def test_initialize(s3_bucket_name: str, s3_workspace_uninitialized: S3FileWorkspace):
    s3 = s3_workspace_uninitialized._s3

    # test that the bucket doesn't exist yet
    with pytest.raises(ClientError):
        s3.meta.client.head_bucket(Bucket=s3_bucket_name)

    s3_workspace_uninitialized.initialize()

    # test that the bucket has been created
    s3.meta.client.head_bucket(Bucket=s3_bucket_name)


def test_workspace_bucket_name(
    s3_workspace: S3FileWorkspace,
    s3_bucket_name: str,
):
    assert s3_workspace._bucket.name == s3_bucket_name


@pytest.fixture
def s3_workspace(s3_workspace_uninitialized: S3FileWorkspace) -> S3FileWorkspace:
    (s3_workspace := s3_workspace_uninitialized).initialize()
    yield s3_workspace  # type: ignore

    # Empty & delete the test bucket
    s3_workspace._bucket.objects.all().delete()
    s3_workspace._bucket.delete()


NESTED_DIR = "existing/test/dir"
TEST_FILES: list[tuple[str | Path, str]] = [
    ("existing_test_file_1", "test content 1"),
    ("existing_test_file_2.txt", "test content 2"),
    (Path("existing_test_file_3"), "test content 3"),
    (Path(f"{NESTED_DIR}/test/file/4"), "test content 4"),
]


@pytest_asyncio.fixture
async def s3_workspace_with_files(s3_workspace: S3FileWorkspace) -> S3FileWorkspace:
    for file_name, file_content in TEST_FILES:
        s3_workspace._bucket.Object(str(s3_workspace.get_path(file_name))).put(
            Body=file_content
        )
    yield s3_workspace  # type: ignore


@pytest.mark.asyncio
async def test_read_file(s3_workspace_with_files: S3FileWorkspace):
    for file_name, file_content in TEST_FILES:
        content = s3_workspace_with_files.read_file(file_name)
        assert content == file_content

    with pytest.raises(ClientError):
        s3_workspace_with_files.read_file("non_existent_file")


def test_list_files(s3_workspace_with_files: S3FileWorkspace):
    # List at root level
    assert (files := s3_workspace_with_files.list()) == s3_workspace_with_files.list()
    assert len(files) > 0
    assert set(files) == set(Path(file_name) for file_name, _ in TEST_FILES)

    # List at nested path
    assert (
        nested_files := s3_workspace_with_files.list(NESTED_DIR)
    ) == s3_workspace_with_files.list(NESTED_DIR)
    assert len(nested_files) > 0
    assert set(nested_files) == set(
        p.relative_to(NESTED_DIR)
        for file_name, _ in TEST_FILES
        if (p := Path(file_name)).is_relative_to(NESTED_DIR)
    )


@pytest.mark.asyncio
async def test_write_read_file(s3_workspace: S3FileWorkspace):
    await s3_workspace.write_file("test_file", "test_content")
    assert s3_workspace.read_file("test_file") == "test_content"


@pytest.mark.asyncio
async def test_overwrite_file(s3_workspace_with_files: S3FileWorkspace):
    for file_name, _ in TEST_FILES:
        await s3_workspace_with_files.write_file(file_name, "new content")
        assert s3_workspace_with_files.read_file(file_name) == "new content"


def test_delete_file(s3_workspace_with_files: S3FileWorkspace):
    for file_to_delete, _ in TEST_FILES:
        s3_workspace_with_files.delete_file(file_to_delete)
        with pytest.raises(ClientError):
            s3_workspace_with_files.read_file(file_to_delete)