aboutsummaryrefslogtreecommitdiff
path: root/autogpt/core/ability/builtins/file_operations.py
diff options
context:
space:
mode:
Diffstat (limited to 'autogpt/core/ability/builtins/file_operations.py')
-rw-r--r--autogpt/core/ability/builtins/file_operations.py167
1 files changed, 0 insertions, 167 deletions
diff --git a/autogpt/core/ability/builtins/file_operations.py b/autogpt/core/ability/builtins/file_operations.py
deleted file mode 100644
index 43cd0d0cd..000000000
--- a/autogpt/core/ability/builtins/file_operations.py
+++ /dev/null
@@ -1,167 +0,0 @@
-import logging
-import os
-
-from autogpt.core.ability.base import Ability, AbilityConfiguration
-from autogpt.core.ability.schema import AbilityResult, ContentType, Knowledge
-from autogpt.core.workspace import Workspace
-
-
-class ReadFile(Ability):
- default_configuration = AbilityConfiguration(
- packages_required=["unstructured"],
- workspace_required=True,
- )
-
- def __init__(
- self,
- logger: logging.Logger,
- workspace: Workspace,
- ):
- self._logger = logger
- self._workspace = workspace
-
- @property
- def description(self) -> str:
- return "Read and parse all text from a file."
-
- @property
- def arguments(self) -> dict:
- return {
- "filename": {
- "type": "string",
- "description": "The name of the file to read.",
- },
- }
-
- def _check_preconditions(self, filename: str) -> AbilityResult | None:
- message = ""
- try:
- pass
- except ImportError:
- message = "Package charset_normalizer is not installed."
-
- try:
- file_path = self._workspace.get_path(filename)
- if not file_path.exists():
- message = f"File {filename} does not exist."
- if not file_path.is_file():
- message = f"{filename} is not a file."
- except ValueError as e:
- message = str(e)
-
- if message:
- return AbilityResult(
- ability_name=self.name(),
- ability_args={"filename": filename},
- success=False,
- message=message,
- data=None,
- )
-
- def __call__(self, filename: str) -> AbilityResult:
- if result := self._check_preconditions(filename):
- return result
-
- from unstructured.partition.auto import partition
-
- file_path = self._workspace.get_path(filename)
- try:
- elements = partition(str(file_path))
- # TODO: Lots of other potentially useful information is available
- # in the partitioned file. Consider returning more of it.
- new_knowledge = Knowledge(
- content="\n\n".join([element.text for element in elements]),
- content_type=ContentType.TEXT,
- content_metadata={"filename": filename},
- )
- success = True
- message = f"File {file_path} read successfully."
- except IOError as e:
- new_knowledge = None
- success = False
- message = str(e)
-
- return AbilityResult(
- ability_name=self.name(),
- ability_args={"filename": filename},
- success=success,
- message=message,
- new_knowledge=new_knowledge,
- )
-
-
-class WriteFile(Ability):
- default_configuration = AbilityConfiguration(
- packages_required=["unstructured"],
- workspace_required=True,
- )
-
- def __init__(
- self,
- logger: logging.Logger,
- workspace: Workspace,
- ):
- self._logger = logger
- self._workspace = workspace
-
- @property
- def description(self) -> str:
- return "Write text to a file."
-
- @property
- def arguments(self) -> dict:
- return {
- "filename": {
- "type": "string",
- "description": "The name of the file to write.",
- },
- "contents": {
- "type": "string",
- "description": "The contents of the file to write.",
- },
- }
-
- def _check_preconditions(
- self, filename: str, contents: str
- ) -> AbilityResult | None:
- message = ""
- try:
- file_path = self._workspace.get_path(filename)
- if file_path.exists():
- message = f"File {filename} already exists."
- if len(contents):
- message = f"File {filename} was not given any content."
- except ValueError as e:
- message = str(e)
-
- if message:
- return AbilityResult(
- ability_name=self.name(),
- ability_args={"filename": filename, "contents": contents},
- success=False,
- message=message,
- data=None,
- )
-
- def __call__(self, filename: str, contents: str) -> AbilityResult:
- if result := self._check_preconditions(filename, contents):
- return result
-
- file_path = self._workspace.get_path(filename)
- try:
- directory = os.path.dirname(file_path)
- os.makedirs(directory)
- with open(filename, "w", encoding="utf-8") as f:
- f.write(contents)
- success = True
- message = f"File {file_path} written successfully."
- except IOError as e:
- success = False
- message = str(e)
-
- return AbilityResult(
- ability_name=self.name(),
- ability_args={"filename": filename},
- success=success,
- message=message,
- )