diff options
Diffstat (limited to 'autogpts/autogpt/autogpt/commands/file_operations_utils.py')
-rw-r--r-- | autogpts/autogpt/autogpt/commands/file_operations_utils.py | 116 |
1 files changed, 49 insertions, 67 deletions
diff --git a/autogpts/autogpt/autogpt/commands/file_operations_utils.py b/autogpts/autogpt/autogpt/commands/file_operations_utils.py index bbf3706aa..da65bafaa 100644 --- a/autogpts/autogpt/autogpt/commands/file_operations_utils.py +++ b/autogpts/autogpt/autogpt/commands/file_operations_utils.py @@ -1,12 +1,11 @@ import json import logging -import os -from pathlib import Path +from abc import ABC, abstractmethod +from typing import BinaryIO import charset_normalizer import docx -import markdown -import PyPDF2 +import pypdf import yaml from bs4 import BeautifulSoup from pylatexenc.latex2text import LatexNodes2Text @@ -14,23 +13,27 @@ from pylatexenc.latex2text import LatexNodes2Text logger = logging.getLogger(__name__) -class ParserStrategy: - def read(self, file_path: Path) -> str: - raise NotImplementedError +class ParserStrategy(ABC): + @abstractmethod + def read(self, file: BinaryIO) -> str: + ... # Basic text file reading class TXTParser(ParserStrategy): - def read(self, file_path: Path) -> str: - charset_match = charset_normalizer.from_path(file_path).best() - logger.debug(f"Reading '{file_path}' with encoding '{charset_match.encoding}'") + def read(self, file: BinaryIO) -> str: + charset_match = charset_normalizer.from_bytes(file.read()).best() + logger.debug( + f"Reading {getattr(file, 'name', 'file')} " + f"with encoding '{charset_match.encoding}'" + ) return str(charset_match) # Reading text from binary file using pdf parser class PDFParser(ParserStrategy): - def read(self, file_path: Path) -> str: - parser = PyPDF2.PdfReader(file_path) + def read(self, file: BinaryIO) -> str: + parser = pypdf.PdfReader(file) text = "" for page_idx in range(len(parser.pages)): text += parser.pages[page_idx].extract_text() @@ -39,8 +42,8 @@ class PDFParser(ParserStrategy): # Reading text from binary file using docs parser class DOCXParser(ParserStrategy): - def read(self, file_path: Path) -> str: - doc_file = docx.Document(file_path) + def read(self, file: BinaryIO) -> str: + doc_file = docx.Document(file) text = "" for para in doc_file.paragraphs: text += para.text @@ -49,50 +52,37 @@ class DOCXParser(ParserStrategy): # Reading as dictionary and returning string format class JSONParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - data = json.load(f) - text = str(data) + def read(self, file: BinaryIO) -> str: + data = json.load(file) + text = str(data) return text class XMLParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - soup = BeautifulSoup(f, "xml") - text = soup.get_text() + def read(self, file: BinaryIO) -> str: + soup = BeautifulSoup(file, "xml") + text = soup.get_text() return text # Reading as dictionary and returning string format class YAMLParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - data = yaml.load(f, Loader=yaml.FullLoader) - text = str(data) + def read(self, file: BinaryIO) -> str: + data = yaml.load(file, Loader=yaml.FullLoader) + text = str(data) return text class HTMLParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - soup = BeautifulSoup(f, "html.parser") - text = soup.get_text() - return text - - -class MarkdownParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - html = markdown.markdown(f.read()) - text = "".join(BeautifulSoup(html, "html.parser").findAll(string=True)) + def read(self, file: BinaryIO) -> str: + soup = BeautifulSoup(file, "html.parser") + text = soup.get_text() return text class LaTeXParser(ParserStrategy): - def read(self, file_path: Path) -> str: - with open(file_path, "r") as f: - latex = f.read() + def read(self, file: BinaryIO) -> str: + latex = file.read().decode() text = LatexNodes2Text().latex_to_text(latex) return text @@ -106,13 +96,17 @@ class FileContext: self.logger.debug(f"Setting Context Parser to {parser}") self.parser = parser - def read_file(self, file_path) -> str: - self.logger.debug(f"Reading file {file_path} with parser {self.parser}") - return self.parser.read(file_path) + def decode_file(self, file: BinaryIO) -> str: + self.logger.debug( + f"Reading {getattr(file, 'name', 'file')} with parser {self.parser}" + ) + return self.parser.read(file) extension_to_parser = { ".txt": TXTParser(), + ".md": TXTParser(), + ".markdown": TXTParser(), ".csv": TXTParser(), ".pdf": PDFParser(), ".docx": DOCXParser(), @@ -123,47 +117,35 @@ extension_to_parser = { ".html": HTMLParser(), ".htm": HTMLParser(), ".xhtml": HTMLParser(), - ".md": MarkdownParser(), - ".markdown": MarkdownParser(), ".tex": LaTeXParser(), } -def is_file_binary_fn(file_path: Path): +def is_file_binary_fn(file: BinaryIO): """Given a file path load all its content and checks if the null bytes is present Args: - file_path (_type_): _description_ + file (_type_): _description_ Returns: bool: is_binary """ - with open(file_path, "rb") as f: - file_data = f.read() + file_data = file.read() + file.seek(0) if b"\x00" in file_data: return True return False -def read_textual_file(file_path: Path, logger: logging.Logger) -> str: - if not file_path.is_absolute(): - raise ValueError("File path must be absolute") - - if not file_path.is_file(): - if not file_path.exists(): - raise FileNotFoundError( - f"read_file {file_path} failed: no such file or directory" - ) - else: - raise ValueError(f"read_file failed: {file_path} is not a file") +def decode_textual_file(file: BinaryIO, ext: str, logger: logging.Logger) -> str: + if not file.readable(): + raise ValueError(f"{repr(file)} is not readable") - is_binary = is_file_binary_fn(file_path) - file_extension = os.path.splitext(file_path)[1].lower() - parser = extension_to_parser.get(file_extension) + parser = extension_to_parser.get(ext.lower()) if not parser: - if is_binary: - raise ValueError(f"Unsupported binary file format: {file_extension}") + if is_file_binary_fn(file): + raise ValueError(f"Unsupported binary file format: {ext}") # fallback to txt file parser (to support script and code files loading) parser = TXTParser() file_context = FileContext(parser, logger) - return file_context.read_file(file_path) + return file_context.decode_file(file) |