aboutsummaryrefslogtreecommitdiff
path: root/autogpts/forge/forge/actions/web/web_selenium.py
diff options
context:
space:
mode:
Diffstat (limited to 'autogpts/forge/forge/actions/web/web_selenium.py')
-rw-r--r--autogpts/forge/forge/actions/web/web_selenium.py366
1 files changed, 366 insertions, 0 deletions
diff --git a/autogpts/forge/forge/actions/web/web_selenium.py b/autogpts/forge/forge/actions/web/web_selenium.py
new file mode 100644
index 000000000..9e3c7494b
--- /dev/null
+++ b/autogpts/forge/forge/actions/web/web_selenium.py
@@ -0,0 +1,366 @@
+"""Commands for browsing a website"""
+
+from __future__ import annotations
+
+COMMAND_CATEGORY = "web_browse"
+COMMAND_CATEGORY_TITLE = "Web Browsing"
+
+import functools
+import logging
+import re
+from pathlib import Path
+from sys import platform
+from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type
+from urllib.parse import urljoin, urlparse
+
+from bs4 import BeautifulSoup
+from requests.compat import urljoin
+from selenium.common.exceptions import WebDriverException
+from selenium.webdriver.chrome.options import Options as ChromeOptions
+from selenium.webdriver.chrome.service import Service as ChromeDriverService
+from selenium.webdriver.chrome.webdriver import WebDriver as ChromeDriver
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.options import ArgOptions as BrowserOptions
+from selenium.webdriver.edge.options import Options as EdgeOptions
+from selenium.webdriver.edge.service import Service as EdgeDriverService
+from selenium.webdriver.edge.webdriver import WebDriver as EdgeDriver
+from selenium.webdriver.firefox.options import Options as FirefoxOptions
+from selenium.webdriver.firefox.service import Service as GeckoDriverService
+from selenium.webdriver.firefox.webdriver import WebDriver as FirefoxDriver
+from selenium.webdriver.remote.webdriver import WebDriver
+from selenium.webdriver.safari.options import Options as SafariOptions
+from selenium.webdriver.safari.webdriver import WebDriver as SafariDriver
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.wait import WebDriverWait
+from webdriver_manager.chrome import ChromeDriverManager
+from webdriver_manager.firefox import GeckoDriverManager
+from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager
+
+from forge.sdk.errors import CommandExecutionError
+
+from ..registry import action
+
+
+def extract_hyperlinks(soup: BeautifulSoup, base_url: str) -> list[tuple[str, str]]:
+ """Extract hyperlinks from a BeautifulSoup object
+
+ Args:
+ soup (BeautifulSoup): The BeautifulSoup object
+ base_url (str): The base URL
+
+ Returns:
+ List[Tuple[str, str]]: The extracted hyperlinks
+ """
+ return [
+ (link.text, urljoin(base_url, link["href"]))
+ for link in soup.find_all("a", href=True)
+ ]
+
+
+def format_hyperlinks(hyperlinks: list[tuple[str, str]]) -> list[str]:
+ """Format hyperlinks to be displayed to the user
+
+ Args:
+ hyperlinks (List[Tuple[str, str]]): The hyperlinks to format
+
+ Returns:
+ List[str]: The formatted hyperlinks
+ """
+ return [f"{link_text} ({link_url})" for link_text, link_url in hyperlinks]
+
+
+def validate_url(func: Callable[..., Any]) -> Any:
+ """The method decorator validate_url is used to validate urls for any command that requires
+ a url as an argument"""
+
+ @functools.wraps(func)
+ def wrapper(url: str, *args, **kwargs) -> Any:
+ """Check if the URL is valid using a basic check, urllib check, and local file check
+
+ Args:
+ url (str): The URL to check
+
+ Returns:
+ the result of the wrapped function
+
+ Raises:
+ ValueError if the url fails any of the validation tests
+ """
+ # Most basic check if the URL is valid:
+ if not re.match(r"^https?://", url):
+ raise ValueError("Invalid URL format")
+ if not is_valid_url(url):
+ raise ValueError("Missing Scheme or Network location")
+ # Restrict access to local files
+ if check_local_file_access(url):
+ raise ValueError("Access to local files is restricted")
+ # Check URL length
+ if len(url) > 2000:
+ raise ValueError("URL is too long")
+
+ return func(sanitize_url(url), *args, **kwargs)
+
+ return wrapper
+
+
+def is_valid_url(url: str) -> bool:
+ """Check if the URL is valid
+
+ Args:
+ url (str): The URL to check
+
+ Returns:
+ bool: True if the URL is valid, False otherwise
+ """
+ try:
+ result = urlparse(url)
+ return all([result.scheme, result.netloc])
+ except ValueError:
+ return False
+
+
+def sanitize_url(url: str) -> str:
+ """Sanitize the URL
+
+ Args:
+ url (str): The URL to sanitize
+
+ Returns:
+ str: The sanitized URL
+ """
+ parsed_url = urlparse(url)
+ reconstructed_url = f"{parsed_url.path}{parsed_url.params}?{parsed_url.query}"
+ return urljoin(url, reconstructed_url)
+
+
+def check_local_file_access(url: str) -> bool:
+ """Check if the URL is a local file
+
+ Args:
+ url (str): The URL to check
+
+ Returns:
+ bool: True if the URL is a local file, False otherwise
+ """
+ local_prefixes = [
+ "file:///",
+ "file://localhost/",
+ "file://localhost",
+ "http://localhost",
+ "http://localhost/",
+ "https://localhost",
+ "https://localhost/",
+ "http://2130706433",
+ "http://2130706433/",
+ "https://2130706433",
+ "https://2130706433/",
+ "http://127.0.0.1/",
+ "http://127.0.0.1",
+ "https://127.0.0.1/",
+ "https://127.0.0.1",
+ "https://0.0.0.0/",
+ "https://0.0.0.0",
+ "http://0.0.0.0/",
+ "http://0.0.0.0",
+ "http://0000",
+ "http://0000/",
+ "https://0000",
+ "https://0000/",
+ ]
+ return any(url.startswith(prefix) for prefix in local_prefixes)
+
+
+logger = logging.getLogger(__name__)
+
+FILE_DIR = Path(__file__).parent.parent
+TOKENS_TO_TRIGGER_SUMMARY = 50
+LINKS_TO_RETURN = 20
+
+
+class BrowsingError(CommandExecutionError):
+ """An error occurred while trying to browse the page"""
+
+
+@action(
+ name="read_webpage",
+ description="Read a webpage, and extract specific information from it if a question is specified. If you are looking to extract specific information from the webpage, you should specify a question.",
+ parameters=[
+ {
+ "name": "url",
+ "description": "The URL to visit",
+ "type": "string",
+ "required": True,
+ },
+ {
+ "name": "question",
+ "description": "A question that you want to answer using the content of the webpage.",
+ "type": "string",
+ "required": False,
+ },
+ ],
+ output_type="string",
+)
+@validate_url
+async def read_webpage(
+ agent, task_id: str, url: str, question: str = ""
+) -> Tuple(str, List[str]):
+ """Browse a website and return the answer and links to the user
+
+ Args:
+ url (str): The url of the website to browse
+ question (str): The question to answer using the content of the webpage
+
+ Returns:
+ str: The answer and links to the user and the webdriver
+ """
+ driver = None
+ try:
+ driver = open_page_in_browser(url)
+
+ text = scrape_text_with_selenium(driver)
+ links = scrape_links_with_selenium(driver, url)
+
+ if not text:
+ return f"Website did not contain any text.\n\nLinks: {links}"
+
+ # Limit links to LINKS_TO_RETURN
+ if len(links) > LINKS_TO_RETURN:
+ links = links[:LINKS_TO_RETURN]
+ return (text, links)
+
+ except WebDriverException as e:
+ # These errors are often quite long and include lots of context.
+ # Just grab the first line.
+ msg = e.msg.split("\n")[0]
+ if "net::" in msg:
+ raise BrowsingError(
+ f"A networking error occurred while trying to load the page: "
+ + re.sub(r"^unknown error: ", "", msg)
+ )
+ raise CommandExecutionError(msg)
+ finally:
+ if driver:
+ close_browser(driver)
+
+
+def scrape_text_with_selenium(driver: WebDriver) -> str:
+ """Scrape text from a browser window using selenium
+
+ Args:
+ driver (WebDriver): A driver object representing the browser window to scrape
+
+ Returns:
+ str: the text scraped from the website
+ """
+
+ # Get the HTML content directly from the browser's DOM
+ page_source = driver.execute_script("return document.body.outerHTML;")
+ soup = BeautifulSoup(page_source, "html.parser")
+
+ for script in soup(["script", "style"]):
+ script.extract()
+
+ text = soup.get_text()
+ lines = (line.strip() for line in text.splitlines())
+ chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
+ text = "\n".join(chunk for chunk in chunks if chunk)
+ return text
+
+
+def scrape_links_with_selenium(driver: WebDriver, base_url: str) -> list[str]:
+ """Scrape links from a website using selenium
+
+ Args:
+ driver (WebDriver): A driver object representing the browser window to scrape
+ base_url (str): The base URL to use for resolving relative links
+
+ Returns:
+ List[str]: The links scraped from the website
+ """
+ page_source = driver.page_source
+ soup = BeautifulSoup(page_source, "html.parser")
+
+ for script in soup(["script", "style"]):
+ script.extract()
+
+ hyperlinks = extract_hyperlinks(soup, base_url)
+
+ return format_hyperlinks(hyperlinks)
+
+
+def open_page_in_browser(url: str) -> WebDriver:
+ """Open a browser window and load a web page using Selenium
+
+ Params:
+ url (str): The URL of the page to load
+
+ Returns:
+ driver (WebDriver): A driver object representing the browser window to scrape
+ """
+ logging.getLogger("selenium").setLevel(logging.CRITICAL)
+ selenium_web_browser = "chrome"
+ selenium_headless = True
+ options_available: dict[str, Type[BrowserOptions]] = {
+ "chrome": ChromeOptions,
+ "edge": EdgeOptions,
+ "firefox": FirefoxOptions,
+ "safari": SafariOptions,
+ }
+
+ options: BrowserOptions = options_available[selenium_web_browser]()
+ options.add_argument(
+ "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36"
+ )
+
+ if selenium_web_browser == "firefox":
+ if selenium_headless:
+ options.headless = True
+ options.add_argument("--disable-gpu")
+ driver = FirefoxDriver(
+ service=GeckoDriverService(GeckoDriverManager().install()), options=options
+ )
+ elif selenium_web_browser == "edge":
+ driver = EdgeDriver(
+ service=EdgeDriverService(EdgeDriverManager().install()), options=options
+ )
+ elif selenium_web_browser == "safari":
+ # Requires a bit more setup on the users end
+ # See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
+ driver = SafariDriver(options=options)
+ else:
+ if platform == "linux" or platform == "linux2":
+ options.add_argument("--disable-dev-shm-usage")
+ options.add_argument("--remote-debugging-port=9222")
+
+ options.add_argument("--no-sandbox")
+ if selenium_headless:
+ options.add_argument("--headless=new")
+ options.add_argument("--disable-gpu")
+
+ chromium_driver_path = Path("/usr/bin/chromedriver")
+
+ driver = ChromeDriver(
+ service=ChromeDriverService(str(chromium_driver_path))
+ if chromium_driver_path.exists()
+ else ChromeDriverService(ChromeDriverManager().install()),
+ options=options,
+ )
+ driver.get(url)
+
+ WebDriverWait(driver, 10).until(
+ EC.presence_of_element_located((By.TAG_NAME, "body"))
+ )
+
+ return driver
+
+
+def close_browser(driver: WebDriver) -> None:
+ """Close the browser
+
+ Args:
+ driver (WebDriver): The webdriver to close
+
+ Returns:
+ None
+ """
+ driver.quit()