Files
t6_mem0/embedchain/loaders/substack.py
2023-11-15 15:33:16 -08:00

87 lines
3.0 KiB
Python

import hashlib
import logging
import time
import requests
from embedchain.helper.json_serializable import register_deserializable
from embedchain.loaders.base_loader import BaseLoader
from embedchain.utils import is_readable
@register_deserializable
class SubstackLoader(BaseLoader):
"""
This method takes a sitemap URL as input and retrieves
all the URLs to use the WebPageLoader to load content
of each page.
"""
def load_data(self, url: str):
try:
from bs4 import BeautifulSoup
from bs4.builder import ParserRejectedMarkup
except ImportError:
raise ImportError(
'Substack requires extra dependencies. Install with `pip install --upgrade "embedchain[dataloaders]"`'
) from None
output = []
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "xml")
links = [link.text for link in soup.find_all("loc") if link.parent.name == "url" and "/p/" in link.text]
if len(links) == 0:
links = [link.text for link in soup.find_all("loc") if "/p/" in link.text]
doc_id = hashlib.sha256((" ".join(links) + url).encode()).hexdigest()
def serialize_response(soup: BeautifulSoup):
data = {}
h1_els = soup.find_all("h1")
if h1_els is not None and len(h1_els) > 0:
data["title"] = h1_els[1].text
description_el = soup.find("meta", {"name": "description"})
if description_el is not None:
data["description"] = description_el["content"]
content_el = soup.find("div", {"class": "available-content"})
if content_el is not None:
data["content"] = content_el.text
like_btn = soup.find("div", {"class": "like-button-container"})
if like_btn is not None:
no_of_likes_div = like_btn.find("div", {"class": "label"})
if no_of_likes_div is not None:
data["no_of_likes"] = no_of_likes_div.text
return data
def load_link(link: str):
try:
each_load_data = requests.get(link)
each_load_data.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
data = serialize_response(soup)
data = str(data)
if is_readable(data):
return data
else:
logging.warning(f"Page is not readable (too many invalid characters): {link}")
except ParserRejectedMarkup as e:
logging.error(f"Failed to parse {link}: {e}")
return None
for link in links:
data = load_link(link)
if data:
output.append({"content": data, "meta_data": {"url": link}})
# TODO: allow users to configure this
time.sleep(0.4) # added to avoid rate limiting
return {"doc_id": doc_id, "data": output}