Files
t6_mem0/embedchain/app.py

463 lines
18 KiB
Python

import ast
import json
import logging
import os
import sqlite3
import uuid
from typing import Any, Dict, Optional
import requests
import yaml
from embedchain.cache import (
Config,
ExactMatchEvaluation,
SearchDistanceEvaluation,
cache,
gptcache_data_manager,
gptcache_pre_function,
)
from embedchain.client import Client
from embedchain.config import AppConfig, CacheConfig, ChunkerConfig
from embedchain.constants import SQLITE_PATH
from embedchain.embedchain import EmbedChain
from embedchain.embedder.base import BaseEmbedder
from embedchain.embedder.openai import OpenAIEmbedder
from embedchain.factory import EmbedderFactory, LlmFactory, VectorDBFactory
from embedchain.helpers.json_serializable import register_deserializable
from embedchain.llm.base import BaseLlm
from embedchain.llm.openai import OpenAILlm
from embedchain.telemetry.posthog import AnonymousTelemetry
from embedchain.utils.misc import validate_config
from embedchain.vectordb.base import BaseVectorDB
from embedchain.vectordb.chroma import ChromaDB
# Set up the user directory if it doesn't exist already
Client.setup_dir()
@register_deserializable
class App(EmbedChain):
"""
EmbedChain App lets you create a LLM powered app for your unstructured
data by defining your chosen data source, embedding model,
and vector database.
"""
def __init__(
self,
id: str = None,
name: str = None,
config: AppConfig = None,
db: BaseVectorDB = None,
embedding_model: BaseEmbedder = None,
llm: BaseLlm = None,
config_data: dict = None,
log_level=logging.WARN,
auto_deploy: bool = False,
chunker: ChunkerConfig = None,
cache_config: CacheConfig = None,
):
"""
Initialize a new `App` instance.
:param config: Configuration for the pipeline, defaults to None
:type config: AppConfig, optional
:param db: The database to use for storing and retrieving embeddings, defaults to None
:type db: BaseVectorDB, optional
:param embedding_model: The embedding model used to calculate embeddings, defaults to None
:type embedding_model: BaseEmbedder, optional
:param llm: The LLM model used to calculate embeddings, defaults to None
:type llm: BaseLlm, optional
:param config_data: Config dictionary, defaults to None
:type config_data: dict, optional
:param log_level: Log level to use, defaults to logging.WARN
:type log_level: int, optional
:param auto_deploy: Whether to deploy the pipeline automatically, defaults to False
:type auto_deploy: bool, optional
:raises Exception: If an error occurs while creating the pipeline
"""
if id and config_data:
raise Exception("Cannot provide both id and config. Please provide only one of them.")
if id and name:
raise Exception("Cannot provide both id and name. Please provide only one of them.")
if name and config:
raise Exception("Cannot provide both name and config. Please provide only one of them.")
logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.logger = logging.getLogger(__name__)
self.auto_deploy = auto_deploy
# Store the dict config as an attribute to be able to send it
self.config_data = config_data if (config_data and validate_config(config_data)) else None
self.client = None
# pipeline_id from the backend
self.id = None
self.chunker = None
if chunker:
self.chunker = ChunkerConfig(**chunker)
self.cache_config = cache_config
self.config = config or AppConfig()
self.name = self.config.name
self.config.id = self.local_id = str(uuid.uuid4()) if self.config.id is None else self.config.id
if id is not None:
# Init client first since user is trying to fetch the pipeline
# details from the platform
self._init_client()
pipeline_details = self._get_pipeline(id)
self.config.id = self.local_id = pipeline_details["metadata"]["local_id"]
self.id = id
if name is not None:
self.name = name
self.embedding_model = embedding_model or OpenAIEmbedder()
self.db = db or ChromaDB()
self.llm = llm or OpenAILlm()
self._init_db()
# If cache_config is provided, initializing the cache ...
if self.cache_config is not None:
self._init_cache()
# Send anonymous telemetry
self._telemetry_props = {"class": self.__class__.__name__}
self.telemetry = AnonymousTelemetry(enabled=self.config.collect_metrics)
# Establish a connection to the SQLite database
self.connection = sqlite3.connect(SQLITE_PATH, check_same_thread=False)
self.cursor = self.connection.cursor()
# Create the 'data_sources' table if it doesn't exist
self.cursor.execute(
"""
CREATE TABLE IF NOT EXISTS data_sources (
pipeline_id TEXT,
hash TEXT,
type TEXT,
value TEXT,
metadata TEXT,
is_uploaded INTEGER DEFAULT 0,
PRIMARY KEY (pipeline_id, hash)
)
"""
)
self.connection.commit()
# Send anonymous telemetry
self.telemetry.capture(event_name="init", properties=self._telemetry_props)
self.user_asks = []
if self.auto_deploy:
self.deploy()
def _init_db(self):
"""
Initialize the database.
"""
self.db._set_embedder(self.embedding_model)
self.db._initialize()
self.db.set_collection_name(self.db.config.collection_name)
def _init_cache(self):
if self.cache_config.similarity_eval_config.strategy == "exact":
similarity_eval_func = ExactMatchEvaluation()
else:
similarity_eval_func = SearchDistanceEvaluation(
max_distance=self.cache_config.similarity_eval_config.max_distance,
positive=self.cache_config.similarity_eval_config.positive,
)
cache.init(
pre_embedding_func=gptcache_pre_function,
embedding_func=self.embedding_model.to_embeddings,
data_manager=gptcache_data_manager(vector_dimension=self.embedding_model.vector_dimension),
similarity_evaluation=similarity_eval_func,
config=Config(**self.cache_config.init_config.as_dict()),
)
def _init_client(self):
"""
Initialize the client.
"""
config = Client.load_config()
if config.get("api_key"):
self.client = Client()
else:
api_key = input(
"🔑 Enter your Embedchain API key. You can find the API key at https://app.embedchain.ai/settings/keys/ \n" # noqa: E501
)
self.client = Client(api_key=api_key)
def _get_pipeline(self, id):
"""
Get existing pipeline
"""
print("🛠️ Fetching pipeline details from the platform...")
url = f"{self.client.host}/api/v1/pipelines/{id}/cli/"
r = requests.get(
url,
headers={"Authorization": f"Token {self.client.api_key}"},
)
if r.status_code == 404:
raise Exception(f"❌ Pipeline with id {id} not found!")
print(
f"🎉 Pipeline loaded successfully! Pipeline url: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
)
return r.json()
def _create_pipeline(self):
"""
Create a pipeline on the platform.
"""
print("🛠️ Creating pipeline on the platform...")
# self.config_data is a dict. Pass it inside the key 'yaml_config' to the backend
payload = {
"yaml_config": json.dumps(self.config_data),
"name": self.name,
"local_id": self.local_id,
}
url = f"{self.client.host}/api/v1/pipelines/cli/create/"
r = requests.post(
url,
json=payload,
headers={"Authorization": f"Token {self.client.api_key}"},
)
if r.status_code not in [200, 201]:
raise Exception(f"❌ Error occurred while creating pipeline. API response: {r.text}")
if r.status_code == 200:
print(
f"🎉🎉🎉 Existing pipeline found! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
) # noqa: E501
elif r.status_code == 201:
print(
f"🎉🎉🎉 Pipeline created successfully! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
)
return r.json()
def _get_presigned_url(self, data_type, data_value):
payload = {"data_type": data_type, "data_value": data_value}
r = requests.post(
f"{self.client.host}/api/v1/pipelines/{self.id}/cli/presigned_url/",
json=payload,
headers={"Authorization": f"Token {self.client.api_key}"},
)
r.raise_for_status()
return r.json()
def search(self, query, num_documents=3):
"""
Search for similar documents related to the query in the vector database.
"""
# Send anonymous telemetry
self.telemetry.capture(event_name="search", properties=self._telemetry_props)
# TODO: Search will call the endpoint rather than fetching the data from the db itself when deploy=True.
if self.id is None:
where = {"app_id": self.local_id}
context = self.db.query(
query,
n_results=num_documents,
where=where,
citations=True,
)
result = []
for c in context:
result.append({"context": c[0], "metadata": c[1]})
return result
else:
# Make API call to the backend to get the results
NotImplementedError("Search is not implemented yet for the prod mode.")
def _upload_file_to_presigned_url(self, presigned_url, file_path):
try:
with open(file_path, "rb") as file:
response = requests.put(presigned_url, data=file)
response.raise_for_status()
return response.status_code == 200
except Exception as e:
self.logger.exception(f"Error occurred during file upload: {str(e)}")
print("❌ Error occurred during file upload!")
return False
def _upload_data_to_pipeline(self, data_type, data_value, metadata=None):
payload = {
"data_type": data_type,
"data_value": data_value,
"metadata": metadata,
}
try:
self._send_api_request(f"/api/v1/pipelines/{self.id}/cli/add/", payload)
# print the local file path if user tries to upload a local file
printed_value = metadata.get("file_path") if metadata.get("file_path") else data_value
print(f"✅ Data of type: {data_type}, value: {printed_value} added successfully.")
except Exception as e:
print(f"❌ Error occurred during data upload for type {data_type}!. Error: {str(e)}")
def _send_api_request(self, endpoint, payload):
url = f"{self.client.host}{endpoint}"
headers = {"Authorization": f"Token {self.client.api_key}"}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response
def _process_and_upload_data(self, data_hash, data_type, data_value):
if os.path.isabs(data_value):
presigned_url_data = self._get_presigned_url(data_type, data_value)
presigned_url = presigned_url_data["presigned_url"]
s3_key = presigned_url_data["s3_key"]
if self._upload_file_to_presigned_url(presigned_url, file_path=data_value):
metadata = {"file_path": data_value, "s3_key": s3_key}
data_value = presigned_url
else:
self.logger.error(f"File upload failed for hash: {data_hash}")
return False
else:
if data_type == "qna_pair":
data_value = list(ast.literal_eval(data_value))
metadata = {}
try:
self._upload_data_to_pipeline(data_type, data_value, metadata)
self._mark_data_as_uploaded(data_hash)
return True
except Exception:
print(f"❌ Error occurred during data upload for hash {data_hash}!")
return False
def _mark_data_as_uploaded(self, data_hash):
self.cursor.execute(
"UPDATE data_sources SET is_uploaded = 1 WHERE hash = ? AND pipeline_id = ?",
(data_hash, self.local_id),
)
self.connection.commit()
def get_data_sources(self):
db_data = self.cursor.execute("SELECT * FROM data_sources WHERE pipeline_id = ?", (self.local_id,)).fetchall()
data_sources = []
for data in db_data:
data_sources.append({"data_type": data[2], "data_value": data[3], "metadata": data[4]})
return data_sources
def deploy(self):
if self.client is None:
self._init_client()
pipeline_data = self._create_pipeline()
self.id = pipeline_data["id"]
results = self.cursor.execute(
"SELECT * FROM data_sources WHERE pipeline_id = ? AND is_uploaded = 0", (self.local_id,) # noqa:E501
).fetchall()
if len(results) > 0:
print("🛠️ Adding data to your pipeline...")
for result in results:
data_hash, data_type, data_value = result[1], result[2], result[3]
self._process_and_upload_data(data_hash, data_type, data_value)
# Send anonymous telemetry
self.telemetry.capture(event_name="deploy", properties=self._telemetry_props)
@classmethod
def from_config(
cls,
config_path: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
auto_deploy: bool = False,
yaml_path: Optional[str] = None,
):
"""
Instantiate a Pipeline object from a configuration.
:param config_path: Path to the YAML or JSON configuration file.
:type config_path: Optional[str]
:param config: A dictionary containing the configuration.
:type config: Optional[Dict[str, Any]]
:param auto_deploy: Whether to deploy the pipeline automatically, defaults to False
:type auto_deploy: bool, optional
:param yaml_path: (Deprecated) Path to the YAML configuration file. Use config_path instead.
:type yaml_path: Optional[str]
:return: An instance of the Pipeline class.
:rtype: Pipeline
"""
# Backward compatibility for yaml_path
if yaml_path and not config_path:
config_path = yaml_path
if config_path and config:
raise ValueError("Please provide only one of config_path or config.")
config_data = None
if config_path:
file_extension = os.path.splitext(config_path)[1]
with open(config_path, "r") as file:
if file_extension in [".yaml", ".yml"]:
config_data = yaml.safe_load(file)
elif file_extension == ".json":
config_data = json.load(file)
else:
raise ValueError("config_path must be a path to a YAML or JSON file.")
elif config and isinstance(config, dict):
config_data = config
else:
logging.error(
"Please provide either a config file path (YAML or JSON) or a config dictionary. Falling back to defaults because no config is provided.", # noqa: E501
)
config_data = {}
try:
validate_config(config_data)
except Exception as e:
raise Exception(f"Error occurred while validating the config. Error: {str(e)}")
app_config_data = config_data.get("app", {}).get("config", {})
db_config_data = config_data.get("vectordb", {})
embedding_model_config_data = config_data.get("embedding_model", config_data.get("embedder", {}))
llm_config_data = config_data.get("llm", {})
chunker_config_data = config_data.get("chunker", {})
cache_config_data = config_data.get("cache", None)
app_config = AppConfig(**app_config_data)
db_provider = db_config_data.get("provider", "chroma")
db = VectorDBFactory.create(db_provider, db_config_data.get("config", {}))
if llm_config_data:
llm_provider = llm_config_data.get("provider", "openai")
llm = LlmFactory.create(llm_provider, llm_config_data.get("config", {}))
else:
llm = None
embedding_model_provider = embedding_model_config_data.get("provider", "openai")
embedding_model = EmbedderFactory.create(
embedding_model_provider, embedding_model_config_data.get("config", {})
)
if cache_config_data is not None:
cache_config = CacheConfig.from_config(cache_config_data)
else:
cache_config = None
# Send anonymous telemetry
event_properties = {"init_type": "config_data"}
AnonymousTelemetry().capture(event_name="init", properties=event_properties)
return cls(
config=app_config,
llm=llm,
db=db,
embedding_model=embedding_model,
config_data=config_data,
auto_deploy=auto_deploy,
chunker=chunker_config_data,
cache_config=cache_config,
)