aboutsummaryrefslogtreecommitdiff
path: root/autogpts/autogpt/tests/unit/test_s3_file_storage.py
blob: 82bd5428c5c85d2c9a446252e08256b4677f9065 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import uuid
from pathlib import Path

import pytest
import pytest_asyncio
from botocore.exceptions import ClientError

from autogpt.file_storage.s3 import S3FileStorage, S3FileStorageConfiguration

if not (os.getenv("S3_ENDPOINT_URL") and os.getenv("AWS_ACCESS_KEY_ID")):
    pytest.skip("S3 environment variables are not set", allow_module_level=True)


@pytest.fixture
def s3_bucket_name() -> str:
    return f"test-bucket-{str(uuid.uuid4())[:8]}"


@pytest.fixture
def s3_root() -> Path:
    return Path("/workspaces/AutoGPT-some-unique-task-id")


@pytest.fixture
def s3_storage_uninitialized(s3_bucket_name: str, s3_root: Path) -> S3FileStorage:
    os.environ["STORAGE_BUCKET"] = s3_bucket_name
    storage_config = S3FileStorageConfiguration.from_env()
    storage_config.root = s3_root
    storage = S3FileStorage(storage_config)
    yield storage  # type: ignore
    del os.environ["STORAGE_BUCKET"]


def test_initialize(s3_bucket_name: str, s3_storage_uninitialized: S3FileStorage):
    s3 = s3_storage_uninitialized._s3

    # test that the bucket doesn't exist yet
    with pytest.raises(ClientError):
        s3.meta.client.head_bucket(Bucket=s3_bucket_name)

    s3_storage_uninitialized.initialize()

    # test that the bucket has been created
    s3.meta.client.head_bucket(Bucket=s3_bucket_name)


def test_workspace_bucket_name(
    s3_storage: S3FileStorage,
    s3_bucket_name: str,
):
    assert s3_storage._bucket.name == s3_bucket_name


@pytest.fixture
def s3_storage(s3_storage_uninitialized: S3FileStorage) -> S3FileStorage:
    (s3_storage := s3_storage_uninitialized).initialize()
    yield s3_storage  # type: ignore

    # Empty & delete the test bucket
    s3_storage._bucket.objects.all().delete()
    s3_storage._bucket.delete()


NESTED_DIR = "existing/test/dir"
TEST_FILES: list[tuple[str | Path, str]] = [
    ("existing_test_file_1", "test content 1"),
    ("existing_test_file_2.txt", "test content 2"),
    (Path("existing_test_file_3"), "test content 3"),
    (Path(f"{NESTED_DIR}/test_file_4"), "test content 4"),
]


@pytest_asyncio.fixture
async def s3_storage_with_files(s3_storage: S3FileStorage) -> S3FileStorage:
    for file_name, file_content in TEST_FILES:
        s3_storage._bucket.Object(str(s3_storage.get_path(file_name))).put(
            Body=file_content
        )
    yield s3_storage  # type: ignore


@pytest.mark.asyncio
async def test_read_file(s3_storage_with_files: S3FileStorage):
    for file_name, file_content in TEST_FILES:
        content = s3_storage_with_files.read_file(file_name)
        assert content == file_content

    with pytest.raises(ClientError):
        s3_storage_with_files.read_file("non_existent_file")


def test_list_files(s3_storage_with_files: S3FileStorage):
    # List at root level
    assert (
        files := s3_storage_with_files.list_files()
    ) == s3_storage_with_files.list_files()
    assert len(files) > 0
    assert set(files) == set(Path(file_name) for file_name, _ in TEST_FILES)

    # List at nested path
    assert (
        nested_files := s3_storage_with_files.list_files(NESTED_DIR)
    ) == s3_storage_with_files.list_files(NESTED_DIR)
    assert len(nested_files) > 0
    assert set(nested_files) == set(
        p.relative_to(NESTED_DIR)
        for file_name, _ in TEST_FILES
        if (p := Path(file_name)).is_relative_to(NESTED_DIR)
    )


def test_list_folders(s3_storage_with_files: S3FileStorage):
    # List recursive
    folders = s3_storage_with_files.list_folders(recursive=True)
    assert len(folders) > 0
    assert set(folders) == {
        Path("existing"),
        Path("existing/test"),
        Path("existing/test/dir"),
    }
    # List non-recursive
    folders = s3_storage_with_files.list_folders(recursive=False)
    assert len(folders) > 0
    assert set(folders) == {Path("existing")}


@pytest.mark.asyncio
async def test_write_read_file(s3_storage: S3FileStorage):
    await s3_storage.write_file("test_file", "test_content")
    assert s3_storage.read_file("test_file") == "test_content"


@pytest.mark.asyncio
async def test_overwrite_file(s3_storage_with_files: S3FileStorage):
    for file_name, _ in TEST_FILES:
        await s3_storage_with_files.write_file(file_name, "new content")
        assert s3_storage_with_files.read_file(file_name) == "new content"


def test_delete_file(s3_storage_with_files: S3FileStorage):
    for file_to_delete, _ in TEST_FILES:
        s3_storage_with_files.delete_file(file_to_delete)
        with pytest.raises(ClientError):
            s3_storage_with_files.read_file(file_to_delete)


def test_exists(s3_storage_with_files: S3FileStorage):
    for file_name, _ in TEST_FILES:
        assert s3_storage_with_files.exists(file_name)

    assert not s3_storage_with_files.exists("non_existent_file")


def test_rename_file(s3_storage_with_files: S3FileStorage):
    for file_name, _ in TEST_FILES:
        new_name = str(file_name) + "_renamed"
        s3_storage_with_files.rename(file_name, new_name)
        assert s3_storage_with_files.exists(new_name)
        assert not s3_storage_with_files.exists(file_name)


def test_rename_dir(s3_storage_with_files: S3FileStorage):
    s3_storage_with_files.rename(NESTED_DIR, "existing/test/dir_renamed")
    assert s3_storage_with_files.exists("existing/test/dir_renamed")
    assert not s3_storage_with_files.exists(NESTED_DIR)


def test_clone(s3_storage_with_files: S3FileStorage, s3_root: Path):
    cloned = s3_storage_with_files.clone_with_subroot("existing/test")
    assert cloned.root == s3_root / Path("existing/test")
    assert cloned._bucket.name == s3_storage_with_files._bucket.name
    assert cloned.exists("dir")
    assert cloned.exists("dir/test_file_4")


@pytest.mark.asyncio
async def test_copy_file(storage: S3FileStorage):
    await storage.write_file("test_file.txt", "test content")
    storage.copy("test_file.txt", "test_file_copy.txt")
    storage.make_dir("dir")
    storage.copy("test_file.txt", "dir/test_file_copy.txt")
    assert storage.read_file("test_file_copy.txt") == "test content"
    assert storage.read_file("dir/test_file_copy.txt") == "test content"


@pytest.mark.asyncio
async def test_copy_dir(storage: S3FileStorage):
    storage.make_dir("dir")
    storage.make_dir("dir/sub_dir")
    await storage.write_file("dir/test_file.txt", "test content")
    await storage.write_file("dir/sub_dir/test_file.txt", "test content")
    storage.copy("dir", "dir_copy")
    assert storage.read_file("dir_copy/test_file.txt") == "test content"
    assert storage.read_file("dir_copy/sub_dir/test_file.txt") == "test content"