aboutsummaryrefslogtreecommitdiff
path: root/autogpts/autogpt/tests/unit/test_gcs_file_storage.py
blob: f1348b62d38948024e8142bba9e8b1667763b56e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os
import uuid
from pathlib import Path

import pytest
import pytest_asyncio
from google.auth.exceptions import GoogleAuthError
from google.cloud import storage
from google.cloud.exceptions import NotFound

from autogpt.file_storage.gcs import GCSFileStorage, GCSFileStorageConfiguration

try:
    storage.Client()
except GoogleAuthError:
    pytest.skip("Google Cloud Authentication not configured", allow_module_level=True)


@pytest.fixture(scope="module")
def gcs_bucket_name() -> str:
    return f"test-bucket-{str(uuid.uuid4())[:8]}"


@pytest.fixture(scope="module")
def gcs_root() -> Path:
    return Path("/workspaces/AutoGPT-some-unique-task-id")


@pytest.fixture(scope="module")
def gcs_storage_uninitialized(gcs_bucket_name: str, gcs_root: Path) -> GCSFileStorage:
    os.environ["STORAGE_BUCKET"] = gcs_bucket_name
    storage_config = GCSFileStorageConfiguration.from_env()
    storage_config.root = gcs_root
    storage = GCSFileStorage(storage_config)
    yield storage  # type: ignore
    del os.environ["STORAGE_BUCKET"]


def test_initialize(gcs_bucket_name: str, gcs_storage_uninitialized: GCSFileStorage):
    gcs = gcs_storage_uninitialized._gcs

    # test that the bucket doesn't exist yet
    with pytest.raises(NotFound):
        gcs.get_bucket(gcs_bucket_name)

    gcs_storage_uninitialized.initialize()

    # test that the bucket has been created
    bucket = gcs.get_bucket(gcs_bucket_name)

    # clean up
    bucket.delete(force=True)


@pytest.fixture(scope="module")
def gcs_storage(gcs_storage_uninitialized: GCSFileStorage) -> GCSFileStorage:
    (gcs_storage := gcs_storage_uninitialized).initialize()
    yield gcs_storage  # type: ignore

    # Empty & delete the test bucket
    gcs_storage._bucket.delete(force=True)


def test_workspace_bucket_name(
    gcs_storage: GCSFileStorage,
    gcs_bucket_name: str,
):
    assert gcs_storage._bucket.name == gcs_bucket_name


NESTED_DIR = "existing/test/dir"
TEST_FILES: list[tuple[str | Path, str]] = [
    ("existing_test_file_1", "test content 1"),
    ("existing_test_file_2.txt", "test content 2"),
    (Path("existing_test_file_3"), "test content 3"),
    (Path(f"{NESTED_DIR}/test_file_4"), "test content 4"),
]


@pytest_asyncio.fixture
async def gcs_storage_with_files(gcs_storage: GCSFileStorage) -> GCSFileStorage:
    for file_name, file_content in TEST_FILES:
        gcs_storage._bucket.blob(
            str(gcs_storage.get_path(file_name))
        ).upload_from_string(file_content)
    yield gcs_storage  # type: ignore


@pytest.mark.asyncio
async def test_read_file(gcs_storage_with_files: GCSFileStorage):
    for file_name, file_content in TEST_FILES:
        content = gcs_storage_with_files.read_file(file_name)
        assert content == file_content

    with pytest.raises(NotFound):
        gcs_storage_with_files.read_file("non_existent_file")


def test_list_files(gcs_storage_with_files: GCSFileStorage):
    # List at root level
    assert (
        files := gcs_storage_with_files.list_files()
    ) == gcs_storage_with_files.list_files()
    assert len(files) > 0
    assert set(files) == set(Path(file_name) for file_name, _ in TEST_FILES)

    # List at nested path
    assert (
        nested_files := gcs_storage_with_files.list_files(NESTED_DIR)
    ) == gcs_storage_with_files.list_files(NESTED_DIR)
    assert len(nested_files) > 0
    assert set(nested_files) == set(
        p.relative_to(NESTED_DIR)
        for file_name, _ in TEST_FILES
        if (p := Path(file_name)).is_relative_to(NESTED_DIR)
    )


def test_list_folders(gcs_storage_with_files: GCSFileStorage):
    # List recursive
    folders = gcs_storage_with_files.list_folders(recursive=True)
    assert len(folders) > 0
    assert set(folders) == {
        Path("existing"),
        Path("existing/test"),
        Path("existing/test/dir"),
    }
    # List non-recursive
    folders = gcs_storage_with_files.list_folders(recursive=False)
    assert len(folders) > 0
    assert set(folders) == {Path("existing")}


@pytest.mark.asyncio
async def test_write_read_file(gcs_storage: GCSFileStorage):
    await gcs_storage.write_file("test_file", "test_content")
    assert gcs_storage.read_file("test_file") == "test_content"


@pytest.mark.asyncio
async def test_overwrite_file(gcs_storage_with_files: GCSFileStorage):
    for file_name, _ in TEST_FILES:
        await gcs_storage_with_files.write_file(file_name, "new content")
        assert gcs_storage_with_files.read_file(file_name) == "new content"


def test_delete_file(gcs_storage_with_files: GCSFileStorage):
    for file_to_delete, _ in TEST_FILES:
        gcs_storage_with_files.delete_file(file_to_delete)
        assert not gcs_storage_with_files.exists(file_to_delete)


def test_exists(gcs_storage_with_files: GCSFileStorage):
    for file_name, _ in TEST_FILES:
        assert gcs_storage_with_files.exists(file_name)

    assert not gcs_storage_with_files.exists("non_existent_file")


def test_rename_file(gcs_storage_with_files: GCSFileStorage):
    for file_name, _ in TEST_FILES:
        new_name = str(file_name) + "_renamed"
        gcs_storage_with_files.rename(file_name, new_name)
        assert gcs_storage_with_files.exists(new_name)
        assert not gcs_storage_with_files.exists(file_name)


def test_rename_dir(gcs_storage_with_files: GCSFileStorage):
    gcs_storage_with_files.rename(NESTED_DIR, "existing/test/dir_renamed")
    assert gcs_storage_with_files.exists("existing/test/dir_renamed")
    assert not gcs_storage_with_files.exists(NESTED_DIR)


def test_clone(gcs_storage_with_files: GCSFileStorage, gcs_root: Path):
    cloned = gcs_storage_with_files.clone_with_subroot("existing/test")
    assert cloned.root == gcs_root / Path("existing/test")
    assert cloned._bucket.name == gcs_storage_with_files._bucket.name
    assert cloned.exists("dir")
    assert cloned.exists("dir/test_file_4")