aboutsummaryrefslogtreecommitdiff
path: root/autogpts/autogpt/autogpt/commands/file_operations_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'autogpts/autogpt/autogpt/commands/file_operations_utils.py')
-rw-r--r--autogpts/autogpt/autogpt/commands/file_operations_utils.py116
1 files changed, 49 insertions, 67 deletions
diff --git a/autogpts/autogpt/autogpt/commands/file_operations_utils.py b/autogpts/autogpt/autogpt/commands/file_operations_utils.py
index bbf3706aa..da65bafaa 100644
--- a/autogpts/autogpt/autogpt/commands/file_operations_utils.py
+++ b/autogpts/autogpt/autogpt/commands/file_operations_utils.py
@@ -1,12 +1,11 @@
import json
import logging
-import os
-from pathlib import Path
+from abc import ABC, abstractmethod
+from typing import BinaryIO
import charset_normalizer
import docx
-import markdown
-import PyPDF2
+import pypdf
import yaml
from bs4 import BeautifulSoup
from pylatexenc.latex2text import LatexNodes2Text
@@ -14,23 +13,27 @@ from pylatexenc.latex2text import LatexNodes2Text
logger = logging.getLogger(__name__)
-class ParserStrategy:
- def read(self, file_path: Path) -> str:
- raise NotImplementedError
+class ParserStrategy(ABC):
+ @abstractmethod
+ def read(self, file: BinaryIO) -> str:
+ ...
# Basic text file reading
class TXTParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- charset_match = charset_normalizer.from_path(file_path).best()
- logger.debug(f"Reading '{file_path}' with encoding '{charset_match.encoding}'")
+ def read(self, file: BinaryIO) -> str:
+ charset_match = charset_normalizer.from_bytes(file.read()).best()
+ logger.debug(
+ f"Reading {getattr(file, 'name', 'file')} "
+ f"with encoding '{charset_match.encoding}'"
+ )
return str(charset_match)
# Reading text from binary file using pdf parser
class PDFParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- parser = PyPDF2.PdfReader(file_path)
+ def read(self, file: BinaryIO) -> str:
+ parser = pypdf.PdfReader(file)
text = ""
for page_idx in range(len(parser.pages)):
text += parser.pages[page_idx].extract_text()
@@ -39,8 +42,8 @@ class PDFParser(ParserStrategy):
# Reading text from binary file using docs parser
class DOCXParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- doc_file = docx.Document(file_path)
+ def read(self, file: BinaryIO) -> str:
+ doc_file = docx.Document(file)
text = ""
for para in doc_file.paragraphs:
text += para.text
@@ -49,50 +52,37 @@ class DOCXParser(ParserStrategy):
# Reading as dictionary and returning string format
class JSONParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- data = json.load(f)
- text = str(data)
+ def read(self, file: BinaryIO) -> str:
+ data = json.load(file)
+ text = str(data)
return text
class XMLParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- soup = BeautifulSoup(f, "xml")
- text = soup.get_text()
+ def read(self, file: BinaryIO) -> str:
+ soup = BeautifulSoup(file, "xml")
+ text = soup.get_text()
return text
# Reading as dictionary and returning string format
class YAMLParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- data = yaml.load(f, Loader=yaml.FullLoader)
- text = str(data)
+ def read(self, file: BinaryIO) -> str:
+ data = yaml.load(file, Loader=yaml.FullLoader)
+ text = str(data)
return text
class HTMLParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- soup = BeautifulSoup(f, "html.parser")
- text = soup.get_text()
- return text
-
-
-class MarkdownParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- html = markdown.markdown(f.read())
- text = "".join(BeautifulSoup(html, "html.parser").findAll(string=True))
+ def read(self, file: BinaryIO) -> str:
+ soup = BeautifulSoup(file, "html.parser")
+ text = soup.get_text()
return text
class LaTeXParser(ParserStrategy):
- def read(self, file_path: Path) -> str:
- with open(file_path, "r") as f:
- latex = f.read()
+ def read(self, file: BinaryIO) -> str:
+ latex = file.read().decode()
text = LatexNodes2Text().latex_to_text(latex)
return text
@@ -106,13 +96,17 @@ class FileContext:
self.logger.debug(f"Setting Context Parser to {parser}")
self.parser = parser
- def read_file(self, file_path) -> str:
- self.logger.debug(f"Reading file {file_path} with parser {self.parser}")
- return self.parser.read(file_path)
+ def decode_file(self, file: BinaryIO) -> str:
+ self.logger.debug(
+ f"Reading {getattr(file, 'name', 'file')} with parser {self.parser}"
+ )
+ return self.parser.read(file)
extension_to_parser = {
".txt": TXTParser(),
+ ".md": TXTParser(),
+ ".markdown": TXTParser(),
".csv": TXTParser(),
".pdf": PDFParser(),
".docx": DOCXParser(),
@@ -123,47 +117,35 @@ extension_to_parser = {
".html": HTMLParser(),
".htm": HTMLParser(),
".xhtml": HTMLParser(),
- ".md": MarkdownParser(),
- ".markdown": MarkdownParser(),
".tex": LaTeXParser(),
}
-def is_file_binary_fn(file_path: Path):
+def is_file_binary_fn(file: BinaryIO):
"""Given a file path load all its content and checks if the null bytes is present
Args:
- file_path (_type_): _description_
+ file (_type_): _description_
Returns:
bool: is_binary
"""
- with open(file_path, "rb") as f:
- file_data = f.read()
+ file_data = file.read()
+ file.seek(0)
if b"\x00" in file_data:
return True
return False
-def read_textual_file(file_path: Path, logger: logging.Logger) -> str:
- if not file_path.is_absolute():
- raise ValueError("File path must be absolute")
-
- if not file_path.is_file():
- if not file_path.exists():
- raise FileNotFoundError(
- f"read_file {file_path} failed: no such file or directory"
- )
- else:
- raise ValueError(f"read_file failed: {file_path} is not a file")
+def decode_textual_file(file: BinaryIO, ext: str, logger: logging.Logger) -> str:
+ if not file.readable():
+ raise ValueError(f"{repr(file)} is not readable")
- is_binary = is_file_binary_fn(file_path)
- file_extension = os.path.splitext(file_path)[1].lower()
- parser = extension_to_parser.get(file_extension)
+ parser = extension_to_parser.get(ext.lower())
if not parser:
- if is_binary:
- raise ValueError(f"Unsupported binary file format: {file_extension}")
+ if is_file_binary_fn(file):
+ raise ValueError(f"Unsupported binary file format: {ext}")
# fallback to txt file parser (to support script and code files loading)
parser = TXTParser()
file_context = FileContext(parser, logger)
- return file_context.read_file(file_path)
+ return file_context.decode_file(file)