diff options
Diffstat (limited to 'autogpts/forge/forge/actions/web/web_selenium.py')
-rw-r--r-- | autogpts/forge/forge/actions/web/web_selenium.py | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/autogpts/forge/forge/actions/web/web_selenium.py b/autogpts/forge/forge/actions/web/web_selenium.py new file mode 100644 index 000000000..9e3c7494b --- /dev/null +++ b/autogpts/forge/forge/actions/web/web_selenium.py @@ -0,0 +1,366 @@ +"""Commands for browsing a website""" + +from __future__ import annotations + +COMMAND_CATEGORY = "web_browse" +COMMAND_CATEGORY_TITLE = "Web Browsing" + +import functools +import logging +import re +from pathlib import Path +from sys import platform +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type +from urllib.parse import urljoin, urlparse + +from bs4 import BeautifulSoup +from requests.compat import urljoin +from selenium.common.exceptions import WebDriverException +from selenium.webdriver.chrome.options import Options as ChromeOptions +from selenium.webdriver.chrome.service import Service as ChromeDriverService +from selenium.webdriver.chrome.webdriver import WebDriver as ChromeDriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.options import ArgOptions as BrowserOptions +from selenium.webdriver.edge.options import Options as EdgeOptions +from selenium.webdriver.edge.service import Service as EdgeDriverService +from selenium.webdriver.edge.webdriver import WebDriver as EdgeDriver +from selenium.webdriver.firefox.options import Options as FirefoxOptions +from selenium.webdriver.firefox.service import Service as GeckoDriverService +from selenium.webdriver.firefox.webdriver import WebDriver as FirefoxDriver +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.safari.options import Options as SafariOptions +from selenium.webdriver.safari.webdriver import WebDriver as SafariDriver +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait +from webdriver_manager.chrome import ChromeDriverManager +from webdriver_manager.firefox import GeckoDriverManager +from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager + +from forge.sdk.errors import CommandExecutionError + +from ..registry import action + + +def extract_hyperlinks(soup: BeautifulSoup, base_url: str) -> list[tuple[str, str]]: + """Extract hyperlinks from a BeautifulSoup object + + Args: + soup (BeautifulSoup): The BeautifulSoup object + base_url (str): The base URL + + Returns: + List[Tuple[str, str]]: The extracted hyperlinks + """ + return [ + (link.text, urljoin(base_url, link["href"])) + for link in soup.find_all("a", href=True) + ] + + +def format_hyperlinks(hyperlinks: list[tuple[str, str]]) -> list[str]: + """Format hyperlinks to be displayed to the user + + Args: + hyperlinks (List[Tuple[str, str]]): The hyperlinks to format + + Returns: + List[str]: The formatted hyperlinks + """ + return [f"{link_text} ({link_url})" for link_text, link_url in hyperlinks] + + +def validate_url(func: Callable[..., Any]) -> Any: + """The method decorator validate_url is used to validate urls for any command that requires + a url as an argument""" + + @functools.wraps(func) + def wrapper(url: str, *args, **kwargs) -> Any: + """Check if the URL is valid using a basic check, urllib check, and local file check + + Args: + url (str): The URL to check + + Returns: + the result of the wrapped function + + Raises: + ValueError if the url fails any of the validation tests + """ + # Most basic check if the URL is valid: + if not re.match(r"^https?://", url): + raise ValueError("Invalid URL format") + if not is_valid_url(url): + raise ValueError("Missing Scheme or Network location") + # Restrict access to local files + if check_local_file_access(url): + raise ValueError("Access to local files is restricted") + # Check URL length + if len(url) > 2000: + raise ValueError("URL is too long") + + return func(sanitize_url(url), *args, **kwargs) + + return wrapper + + +def is_valid_url(url: str) -> bool: + """Check if the URL is valid + + Args: + url (str): The URL to check + + Returns: + bool: True if the URL is valid, False otherwise + """ + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except ValueError: + return False + + +def sanitize_url(url: str) -> str: + """Sanitize the URL + + Args: + url (str): The URL to sanitize + + Returns: + str: The sanitized URL + """ + parsed_url = urlparse(url) + reconstructed_url = f"{parsed_url.path}{parsed_url.params}?{parsed_url.query}" + return urljoin(url, reconstructed_url) + + +def check_local_file_access(url: str) -> bool: + """Check if the URL is a local file + + Args: + url (str): The URL to check + + Returns: + bool: True if the URL is a local file, False otherwise + """ + local_prefixes = [ + "file:///", + "file://localhost/", + "file://localhost", + "http://localhost", + "http://localhost/", + "https://localhost", + "https://localhost/", + "http://2130706433", + "http://2130706433/", + "https://2130706433", + "https://2130706433/", + "http://127.0.0.1/", + "http://127.0.0.1", + "https://127.0.0.1/", + "https://127.0.0.1", + "https://0.0.0.0/", + "https://0.0.0.0", + "http://0.0.0.0/", + "http://0.0.0.0", + "http://0000", + "http://0000/", + "https://0000", + "https://0000/", + ] + return any(url.startswith(prefix) for prefix in local_prefixes) + + +logger = logging.getLogger(__name__) + +FILE_DIR = Path(__file__).parent.parent +TOKENS_TO_TRIGGER_SUMMARY = 50 +LINKS_TO_RETURN = 20 + + +class BrowsingError(CommandExecutionError): + """An error occurred while trying to browse the page""" + + +@action( + name="read_webpage", + description="Read a webpage, and extract specific information from it if a question is specified. If you are looking to extract specific information from the webpage, you should specify a question.", + parameters=[ + { + "name": "url", + "description": "The URL to visit", + "type": "string", + "required": True, + }, + { + "name": "question", + "description": "A question that you want to answer using the content of the webpage.", + "type": "string", + "required": False, + }, + ], + output_type="string", +) +@validate_url +async def read_webpage( + agent, task_id: str, url: str, question: str = "" +) -> Tuple(str, List[str]): + """Browse a website and return the answer and links to the user + + Args: + url (str): The url of the website to browse + question (str): The question to answer using the content of the webpage + + Returns: + str: The answer and links to the user and the webdriver + """ + driver = None + try: + driver = open_page_in_browser(url) + + text = scrape_text_with_selenium(driver) + links = scrape_links_with_selenium(driver, url) + + if not text: + return f"Website did not contain any text.\n\nLinks: {links}" + + # Limit links to LINKS_TO_RETURN + if len(links) > LINKS_TO_RETURN: + links = links[:LINKS_TO_RETURN] + return (text, links) + + except WebDriverException as e: + # These errors are often quite long and include lots of context. + # Just grab the first line. + msg = e.msg.split("\n")[0] + if "net::" in msg: + raise BrowsingError( + f"A networking error occurred while trying to load the page: " + + re.sub(r"^unknown error: ", "", msg) + ) + raise CommandExecutionError(msg) + finally: + if driver: + close_browser(driver) + + +def scrape_text_with_selenium(driver: WebDriver) -> str: + """Scrape text from a browser window using selenium + + Args: + driver (WebDriver): A driver object representing the browser window to scrape + + Returns: + str: the text scraped from the website + """ + + # Get the HTML content directly from the browser's DOM + page_source = driver.execute_script("return document.body.outerHTML;") + soup = BeautifulSoup(page_source, "html.parser") + + for script in soup(["script", "style"]): + script.extract() + + text = soup.get_text() + lines = (line.strip() for line in text.splitlines()) + chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) + text = "\n".join(chunk for chunk in chunks if chunk) + return text + + +def scrape_links_with_selenium(driver: WebDriver, base_url: str) -> list[str]: + """Scrape links from a website using selenium + + Args: + driver (WebDriver): A driver object representing the browser window to scrape + base_url (str): The base URL to use for resolving relative links + + Returns: + List[str]: The links scraped from the website + """ + page_source = driver.page_source + soup = BeautifulSoup(page_source, "html.parser") + + for script in soup(["script", "style"]): + script.extract() + + hyperlinks = extract_hyperlinks(soup, base_url) + + return format_hyperlinks(hyperlinks) + + +def open_page_in_browser(url: str) -> WebDriver: + """Open a browser window and load a web page using Selenium + + Params: + url (str): The URL of the page to load + + Returns: + driver (WebDriver): A driver object representing the browser window to scrape + """ + logging.getLogger("selenium").setLevel(logging.CRITICAL) + selenium_web_browser = "chrome" + selenium_headless = True + options_available: dict[str, Type[BrowserOptions]] = { + "chrome": ChromeOptions, + "edge": EdgeOptions, + "firefox": FirefoxOptions, + "safari": SafariOptions, + } + + options: BrowserOptions = options_available[selenium_web_browser]() + options.add_argument( + "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36" + ) + + if selenium_web_browser == "firefox": + if selenium_headless: + options.headless = True + options.add_argument("--disable-gpu") + driver = FirefoxDriver( + service=GeckoDriverService(GeckoDriverManager().install()), options=options + ) + elif selenium_web_browser == "edge": + driver = EdgeDriver( + service=EdgeDriverService(EdgeDriverManager().install()), options=options + ) + elif selenium_web_browser == "safari": + # Requires a bit more setup on the users end + # See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari + driver = SafariDriver(options=options) + else: + if platform == "linux" or platform == "linux2": + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--remote-debugging-port=9222") + + options.add_argument("--no-sandbox") + if selenium_headless: + options.add_argument("--headless=new") + options.add_argument("--disable-gpu") + + chromium_driver_path = Path("/usr/bin/chromedriver") + + driver = ChromeDriver( + service=ChromeDriverService(str(chromium_driver_path)) + if chromium_driver_path.exists() + else ChromeDriverService(ChromeDriverManager().install()), + options=options, + ) + driver.get(url) + + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + + return driver + + +def close_browser(driver: WebDriver) -> None: + """Close the browser + + Args: + driver (WebDriver): The webdriver to close + + Returns: + None + """ + driver.quit() |