[Improvement] Parallelize loading of sitemap urls

This commit is contained in:
Deshraj Yadav
2023-11-13 12:53:34 -08:00
parent 1d31b8f7e4
commit a5bf8e9075
3 changed files with 23 additions and 12 deletions

View File

@@ -1,3 +1,4 @@
import concurrent.futures
import hashlib import hashlib
import logging import logging
@@ -20,32 +21,38 @@ from embedchain.utils import is_readable
@register_deserializable @register_deserializable
class SitemapLoader(BaseLoader): class SitemapLoader(BaseLoader):
def load_data(self, sitemap_url): def load_data(self, sitemap_url):
"""
This method takes a sitemap URL as input and retrieves
all the URLs to use the WebPageLoader to load content
of each page.
"""
output = [] output = []
web_page_loader = WebPageLoader() web_page_loader = WebPageLoader()
response = requests.get(sitemap_url) response = requests.get(sitemap_url)
response.raise_for_status() response.raise_for_status()
soup = BeautifulSoup(response.text, "xml") soup = BeautifulSoup(response.text, "xml")
links = [link.text for link in soup.find_all("loc") if link.parent.name == "url"] links = [link.text for link in soup.find_all("loc") if link.parent.name == "url"]
if len(links) == 0: if len(links) == 0:
# Get all <loc> tags as a fallback. This might include images.
links = [link.text for link in soup.find_all("loc")] links = [link.text for link in soup.find_all("loc")]
doc_id = hashlib.sha256((" ".join(links) + sitemap_url).encode()).hexdigest() doc_id = hashlib.sha256((" ".join(links) + sitemap_url).encode()).hexdigest()
for link in links: def load_link(link):
try: try:
each_load_data = web_page_loader.load_data(link) each_load_data = web_page_loader.load_data(link)
if is_readable(each_load_data.get("data")[0].get("content")): if is_readable(each_load_data.get("data")[0].get("content")):
output.append(each_load_data.get("data")) return each_load_data.get("data")
else: else:
logging.warning(f"Page is not readable (too many invalid characters): {link}") logging.warning(f"Page is not readable (too many invalid characters): {link}")
except ParserRejectedMarkup as e: except ParserRejectedMarkup as e:
logging.error(f"Failed to parse {link}: {e}") logging.error(f"Failed to parse {link}: {e}")
return {"doc_id": doc_id, "data": [data[0] for data in output]} return None
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_link = {executor.submit(load_link, link): link for link in links}
for future in concurrent.futures.as_completed(future_to_link):
link = future_to_link[future]
try:
data = future.result()
if data:
output.append(data)
except Exception as e:
logging.error(f"Error loading page {link}: {e}")
return {"doc_id": doc_id, "data": [data[0] for data in output if data]}

View File

@@ -158,7 +158,11 @@ class ChromaDB(BaseVectorDB):
) )
for i in range(0, len(documents), self.BATCH_SIZE): for i in range(0, len(documents), self.BATCH_SIZE):
print("Inserting batches from {} to {} in chromadb".format(i, min(len(documents), i + self.BATCH_SIZE))) print(
"Inserting batches from {} to {} in vector database.".format(
i, min(len(documents), i + self.BATCH_SIZE)
)
)
if skip_embedding: if skip_embedding:
self.collection.add( self.collection.add(
embeddings=embeddings[i : i + self.BATCH_SIZE], embeddings=embeddings[i : i + self.BATCH_SIZE],

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "embedchain" name = "embedchain"
version = "0.1.9" version = "0.1.10"
description = "Data platform for LLMs - Load, index, retrieve and sync any unstructured data" description = "Data platform for LLMs - Load, index, retrieve and sync any unstructured data"
authors = [ authors = [
"Taranjeet Singh <taranjeet@embedchain.ai>", "Taranjeet Singh <taranjeet@embedchain.ai>",