Merge pull request #1 from embedchain/add-simple-app

Add simple app & query functionality
This commit is contained in:
Taranjeet Singh
2023-06-20 16:39:08 +05:30
committed by GitHub
12 changed files with 297 additions and 0 deletions

1
embedchain/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .embedchain import App

View File

View File

@@ -0,0 +1,27 @@
import hashlib
class BaseChunker:
def __init__(self, text_splitter):
self.text_splitter = text_splitter
def create_chunks(self, loader, url):
documents = []
ids = []
datas = loader.load_data(url)
metadatas = []
for data in datas:
content = data["content"]
meta_data = data["meta_data"]
chunks = self.text_splitter.split_text(content)
url = meta_data["url"]
for chunk in chunks:
chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
ids.append(chunk_id)
documents.append(chunk)
metadatas.append(meta_data)
return {
"documents": documents,
"ids": ids,
"metadatas": metadatas,
}

View File

@@ -0,0 +1,16 @@
from embedchain.chunkers.base_chunker import BaseChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter
TEXT_SPLITTER_CHUNK_PARAMS = {
"chunk_size": 1000,
"chunk_overlap": 0,
"length_function": len,
}
class PdfFileChunker(BaseChunker):
def __init__(self):
text_splitter = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
super().__init__(text_splitter)

View File

@@ -0,0 +1,16 @@
from embedchain.chunkers.base_chunker import BaseChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter
TEXT_SPLITTER_CHUNK_PARAMS = {
"chunk_size": 500,
"chunk_overlap": 0,
"length_function": len,
}
class WebsiteChunker(BaseChunker):
def __init__(self):
text_splitter = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
super().__init__(text_splitter)

View File

@@ -0,0 +1,16 @@
from embedchain.chunkers.base_chunker import BaseChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter
TEXT_SPLITTER_CHUNK_PARAMS = {
"chunk_size": 2000,
"chunk_overlap": 0,
"length_function": len,
}
class YoutubeVideoChunker(BaseChunker):
def __init__(self):
text_splitter = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
super().__init__(text_splitter)

136
embedchain/embedchain.py Normal file
View File

@@ -0,0 +1,136 @@
import chromadb
import openai
import os
from chromadb.utils import embedding_functions
from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.embeddings.openai import OpenAIEmbeddings
from embedchain.loaders.youtube_video import YoutubeVideoLoader
from embedchain.loaders.pdf_file import PdfFileLoader
from embedchain.loaders.website import WebsiteLoader
from embedchain.chunkers.youtube_video import YoutubeVideoChunker
from embedchain.chunkers.pdf_file import PdfFileChunker
from embedchain.chunkers.website import WebsiteChunker
load_dotenv()
embeddings = OpenAIEmbeddings()
ABS_PATH = os.getcwd()
DB_DIR = os.path.join(ABS_PATH, "db")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-ada-002"
)
class EmbedChain:
def __init__(self):
self.chromadb_client = self._get_or_create_db()
self.collection = self._get_or_create_collection()
self.user_asks = []
def _get_loader(self, data_type):
loaders = {
'youtube_video': YoutubeVideoLoader(),
'pdf_file': PdfFileLoader(),
'website': WebsiteLoader()
}
if data_type in loaders:
return loaders[data_type]
else:
raise ValueError(f"Unsupported data type: {data_type}")
def _get_chunker(self, data_type):
chunkers = {
'youtube_video': YoutubeVideoChunker(),
'pdf_file': PdfFileChunker(),
'website': WebsiteChunker()
}
if data_type in chunkers:
return chunkers[data_type]
else:
raise ValueError(f"Unsupported data type: {data_type}")
def add(self, data_type, url):
loader = self._get_loader(data_type)
chunker = self._get_chunker(data_type)
self.user_asks.append([data_type, url])
self.load_and_embed(loader, chunker, url)
def _get_or_create_db(self):
client_settings = chromadb.config.Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=DB_DIR,
anonymized_telemetry=False
)
return chromadb.Client(client_settings)
def _get_or_create_collection(self):
return self.chromadb_client.get_or_create_collection(
'embedchain_store', embedding_function=openai_ef,
)
def load_embeddings_to_db(self, loader, chunker, url):
embeddings_data = chunker.create_chunks(loader, url)
documents = embeddings_data["documents"]
metadatas = embeddings_data["metadatas"]
ids = embeddings_data["ids"]
self.collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
print(f"Docs count: {self.collection.count()}")
def load_and_embed(self, loader, chunker, url):
return self.load_embeddings_to_db(loader, chunker, url)
def _format_result(self, results):
return [
(Document(page_content=result[0], metadata=result[1] or {}), result[2])
for result in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
)
]
def get_openai_answer(self, prompt):
messages = []
messages.append({
"role": "user", "content": prompt
})
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
messages=messages,
temperature=0,
max_tokens=1000,
top_p=1,
)
return response["choices"][0]["message"]["content"]
def get_answer_from_llm(self, query, context):
prompt = f"""Use the following pieces of context to answer the query at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.
{context}
Query: {query}
Helpful Answer:
"""
answer = self.get_openai_answer(prompt)
return answer
def query(self, input_query):
result = self.collection.query(
query_texts=[input_query,],
n_results=1,
)
result_formatted = self._format_result(result)
answer = self.get_answer_from_llm(input_query, result_formatted[0][0].page_content)
return answer
class App(EmbedChain):
pass

View File

View File

@@ -0,0 +1,23 @@
from langchain.document_loaders import PyPDFLoader
from embedchain.utils import clean_string
class PdfFileLoader:
def load_data(self, url):
loader = PyPDFLoader(url)
output = []
pages = loader.load_and_split()
if not len(pages):
raise ValueError("No data found")
for page in pages:
content = page.page_content
content = clean_string(content)
meta_data = page.metadata
meta_data["url"] = url
output.append({
"content": content,
"meta_data": meta_data,
})
return output

View File

@@ -0,0 +1,30 @@
import requests
from bs4 import BeautifulSoup
from embedchain.utils import clean_string
class WebsiteLoader:
def load_data(self, url):
response = requests.get(url)
data = response.content
soup = BeautifulSoup(data, 'html.parser')
for tag in soup([
"nav", "aside", "form", "header",
"noscript", "svg", "canvas",
"footer", "script", "style"
]):
tag.string = " "
output = []
content = soup.get_text()
content = clean_string(content)
meta_data = {
"url": url,
}
output.append({
"content": content,
"meta_data": meta_data,
})
return output

View File

@@ -0,0 +1,22 @@
from langchain.document_loaders import YoutubeLoader
from embedchain.utils import clean_string
class YoutubeVideoLoader:
def load_data(self, url):
loader = YoutubeLoader.from_youtube_url(url, add_video_info=True)
doc = loader.load()
output = []
if not len(doc):
raise ValueError("No data found")
content = doc[0].page_content
content = clean_string(content)
meta_data = doc[0].metadata
meta_data["url"] = url
output.append({
"content": content,
"meta_data": meta_data,
})
return output

10
embedchain/utils.py Normal file
View File

@@ -0,0 +1,10 @@
import re
def clean_string(text):
text = text.replace('\n', ' ')
cleaned_text = re.sub(r'\s+', ' ', text.strip())
cleaned_text = cleaned_text.replace('\\', '')
cleaned_text = cleaned_text.replace('#', ' ')
cleaned_text = re.sub(r'([^\w\s])\1*', r'\1', cleaned_text)
return cleaned_text