From f6c4f86986e72a1b0d687c0f594d10a0dcc89f84 Mon Sep 17 00:00:00 2001 From: Deshraj Yadav Date: Fri, 27 Oct 2023 18:42:46 -0700 Subject: [PATCH] [Pipelines] Improvements in pipelines feature (#861) --- docs/mint.json | 4 +++ docs/pipelines/quickstart.mdx | 44 +++++++++++++++++++++++++ embedchain/pipeline.py | 62 ++++++++++++++++++++++++++++++----- pyproject.toml | 2 +- 4 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 docs/pipelines/quickstart.mdx diff --git a/docs/mint.json b/docs/mint.json index 0aee9c78..137a10cd 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -71,6 +71,10 @@ "group": "Examples", "pages": ["examples/full_stack", "examples/api_server", "examples/discord_bot", "examples/slack_bot", "examples/telegram_bot", "examples/whatsapp_bot", "examples/poe_bot"] }, + { + "group": "Pipelines", + "pages": ["pipelines/quickstart"] + }, { "group": "Community", "pages": [ diff --git a/docs/pipelines/quickstart.mdx b/docs/pipelines/quickstart.mdx new file mode 100644 index 00000000..660ca8a6 --- /dev/null +++ b/docs/pipelines/quickstart.mdx @@ -0,0 +1,44 @@ +--- +title: '🚀 Pipelines' +description: '💡 Start building LLM powered data pipelines in 1 minute' +--- + +Embedchain lets you build data pipelines on your own data sources and deploy it in production in less than a minute. It can load, index, retrieve, and sync any unstructured data. + +Install embedchain python package: + +```bash +pip install embedchain +``` + +Creating a pipeline involves 3 steps: + + + +```python +from embedchain import Pipeline +p = Pipeline(name="Elon Musk") +``` + + + +```python +# Add different data sources +p.add("https://en.wikipedia.org/wiki/Elon_Musk") +p.add("https://www.forbes.com/profile/elon-musk") +# You can also add local data sources such as pdf, csv files etc. +# p.add("/path/to/file.pdf") +``` + + +```python +p.deploy() +``` + + + +That's it. Now, head to the [Embedchain platform](https://app.embedchain.ai) and your pipeline is available there. Make sure to set the `OPENAI_API_KEY` 🔑 environment variable in the code. + +After you deploy your pipeline to Embedchain platform, you can still add more data sources and update the pipeline multiple times. + +Here is a Google Colab notebook for you to get started: [![Open in Colab](https://camo.githubusercontent.com/84f0493939e0c4de4e6dbe113251b4bfb5353e57134ffd9fcab6b8714514d4d1/68747470733a2f2f636f6c61622e72657365617263682e676f6f676c652e636f6d2f6173736574732f636f6c61622d62616467652e737667)](https://colab.research.google.com/drive/1YVXaBO4yqlHZY4ho67GCJ6aD4CHNiScD?usp=sharing) diff --git a/embedchain/pipeline.py b/embedchain/pipeline.py index 3e1ee385..420eaf7f 100644 --- a/embedchain/pipeline.py +++ b/embedchain/pipeline.py @@ -34,6 +34,8 @@ class Pipeline(EmbedChain): def __init__( self, + id: str = None, + name: str = None, config: PipelineConfig = None, db: BaseVectorDB = None, embedding_model: BaseEmbedder = None, @@ -61,6 +63,15 @@ class Pipeline(EmbedChain): :type auto_deploy: bool, optional :raises Exception: If an error occurs while creating the pipeline """ + if id and yaml_path: + raise Exception("Cannot provide both id and config. Please provide only one of them.") + + if id and name: + raise Exception("Cannot provide both id and name. Please provide only one of them.") + + if name and config: + raise Exception("Cannot provide both name and config. Please provide only one of them.") + logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") self.logger = logging.getLogger(__name__) @@ -71,16 +82,28 @@ class Pipeline(EmbedChain): self.client = None # pipeline_id from the backend self.id = None - if yaml_path: - with open(yaml_path, "r") as file: - config_data = yaml.safe_load(file) - self.yaml_config = config_data self.config = config or PipelineConfig() self.name = self.config.name self.config.id = self.local_id = str(uuid.uuid4()) if self.config.id is None else self.config.id + if yaml_path: + with open(yaml_path, "r") as file: + config_data = yaml.safe_load(file) + self.yaml_config = config_data + + if id is not None: + # Init client first since user is trying to fetch the pipeline + # details from the platform + self._init_client() + pipeline_details = self._get_pipeline(id) + self.config.id = self.local_id = pipeline_details["metadata"]["local_id"] + self.id = id + + if name is not None: + self.name = name + self.embedding_model = embedding_model or OpenAIEmbedder() self.db = db or ChromaDB() self.llm = llm or OpenAILlm() @@ -134,6 +157,24 @@ class Pipeline(EmbedChain): ) self.client = Client(api_key=api_key) + def _get_pipeline(self, id): + """ + Get existing pipeline + """ + print("🛠️ Fetching pipeline details from the platform...") + url = f"{self.client.host}/api/v1/pipelines/{id}/cli/" + r = requests.get( + url, + headers={"Authorization": f"Token {self.client.api_key}"}, + ) + if r.status_code == 404: + raise Exception(f"❌ Pipeline with id {id} not found!") + + print( + f"🎉 Pipeline loaded successfully! Pipeline url: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501 + ) + return r.json() + def _create_pipeline(self): """ Create a pipeline on the platform. @@ -154,9 +195,14 @@ class Pipeline(EmbedChain): if r.status_code not in [200, 201]: raise Exception(f"❌ Error occurred while creating pipeline. API response: {r.text}") - print( - f"🎉🎉🎉 Pipeline created successfully! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501 - ) + if r.status_code == 200: + print( + f"🎉🎉🎉 Existing pipeline found! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501 + ) # noqa: E501 + elif r.status_code == 201: + print( + f"🎉🎉🎉 Pipeline created successfully! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501 + ) return r.json() def _get_presigned_url(self, data_type, data_value): @@ -257,7 +303,7 @@ class Pipeline(EmbedChain): self.id = pipeline_data["id"] results = self.cursor.execute( - "SELECT * FROM data_sources WHERE pipeline_id = ? AND is_uploaded = 0", (self.local_id,) + "SELECT * FROM data_sources WHERE pipeline_id = ? AND is_uploaded = 0", (self.local_id,) # noqa:E501 ).fetchall() if len(results) > 0: diff --git a/pyproject.toml b/pyproject.toml index 0e193185..b0fdfce7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "embedchain" -version = "0.0.78" +version = "0.0.79" description = "Data platform for LLMs - Load, index, retrieve and sync any unstructured data" authors = ["Taranjeet Singh, Deshraj Yadav"] license = "Apache License"