feature: Add support for zilliz vector database (#771)

This commit is contained in:
LuciAkirami
2023-10-12 01:47:33 +05:30
committed by GitHub
parent 16e123b7bb
commit d6ed2050d4
7 changed files with 438 additions and 0 deletions

View File

@@ -12,3 +12,4 @@ from .llm.base_llm_config import BaseLlmConfig as LlmConfig
from .vectordb.chroma import ChromaDbConfig
from .vectordb.elasticsearch import ElasticsearchDBConfig
from .vectordb.opensearch import OpenSearchDBConfig
from .vectordb.zilliz import ZillizDBConfig

View File

@@ -0,0 +1,49 @@
import os
from typing import Optional
from embedchain.config.vectordb.base import BaseVectorDbConfig
from embedchain.helper.json_serializable import register_deserializable
@register_deserializable
class ZillizDBConfig(BaseVectorDbConfig):
def __init__(
self,
collection_name: Optional[str] = None,
dir: Optional[str] = None,
uri: Optional[str] = None,
token: Optional[str] = None,
vector_dim: Optional[str] = None,
metric_type: Optional[str] = None,
):
"""
Initializes a configuration class instance for the vector database.
:param collection_name: Default name for the collection, defaults to None
:type collection_name: Optional[str], optional
:param dir: Path to the database directory, where the database is stored, defaults to "db"
:type dir: str, optional
:param uri: Cluster endpoint obtained from the Zilliz Console, defaults to None
:type uri: Optional[str], optional
:param token: API Key, if a Serverless Cluster, username:password, if a Dedicated Cluster, defaults to None
:type port: Optional[str], optional
"""
self.uri = uri or os.environ.get("ZILLIZ_CLOUD_URI")
if not self.uri:
raise AttributeError(
"Zilliz needs a URI attribute, "
"this can either be passed to `ZILLIZ_CLOUD_URI` or as `ZILLIZ_CLOUD_URI` in `.env`"
)
self.token = token or os.environ.get("ZILLIZ_CLOUD_TOKEN")
if not self.token:
raise AttributeError(
"Zilliz needs a token attribute, "
"this can either be passed to `ZILLIZ_CLOUD_TOKEN` or as `ZILLIZ_CLOUD_TOKEN` in `.env`,"
"if having a username and password, pass it in the form 'username:password' to `ZILLIZ_CLOUD_TOKEN`"
)
self.metric_type = metric_type if metric_type else "L2"
self.vector_dim = vector_dim
super().__init__(collection_name=collection_name, dir=dir)

View File

@@ -5,3 +5,4 @@ class VectorDatabases(Enum):
CHROMADB = "CHROMADB"
ELASTICSEARCH = "ELASTICSEARCH"
OPENSEARCH = "OPENSEARCH"
ZILLIZ = "ZILLIZ"

View File

@@ -0,0 +1,205 @@
from typing import Dict, List, Optional
from embedchain.config import ZillizDBConfig
from embedchain.helper.json_serializable import register_deserializable
from embedchain.vectordb.base import BaseVectorDB
try:
from pymilvus import MilvusClient
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
except ImportError:
raise ImportError(
"Zilliz requires extra dependencies. Install with `pip install --upgrade embedchain[milvus]`"
) from None
@register_deserializable
class ZillizVectorDB(BaseVectorDB):
"""Base class for vector database."""
def __init__(self, config: ZillizDBConfig = None):
"""Initialize the database. Save the config and client as an attribute.
:param config: Database configuration class instance.
:type config: ZillizDBConfig
"""
if config is None:
self.config = ZillizDBConfig()
else:
self.config = config
self.client = MilvusClient(
uri=self.config.uri,
token=self.config.token,
)
self.connection = connections.connect(
uri=self.config.uri,
token=self.config.token,
)
super().__init__(config=self.config)
def _initialize(self):
"""
This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
So it's can't be done in __init__ in one step.
"""
self._get_or_create_collection(self.config.collection_name)
def _get_or_create_db(self):
"""Get or create the database."""
return self.client
def _get_or_create_collection(self, name):
"""
Get or create a named collection.
:param name: Name of the collection
:type name: str
"""
if utility.has_collection(name):
self.collection = Collection(name)
else:
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=512),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=self.embedder.vector_dimension),
]
schema = CollectionSchema(fields, enable_dynamic_field=True)
self.collection = Collection(name=name, schema=schema)
index = {
"index_type": "AUTOINDEX",
"metric_type": self.config.metric_type,
}
self.collection.create_index("embeddings", index)
return self.collection
def get(self, ids: Optional[List[str]] = None, where: Optional[Dict[str, any]] = None, limit: Optional[int] = None):
"""
Get existing doc ids present in vector database
:param ids: list of doc ids to check for existence
:type ids: List[str]
:param where: Optional. to filter data
:type where: Dict[str, Any]
:param limit: Optional. maximum number of documents
:type limit: Optional[int]
:return: Existing documents.
:rtype: Set[str]
"""
if ids is None or len(ids) == 0 or self.collection.num_entities == 0:
return {"ids": []}
if not (self.collection.is_empty):
filter = f"id in {ids}"
results = self.client.query(
collection_name=self.config.collection_name, filter=filter, output_fields=["id"]
)
results = [res["id"] for res in results]
return {"ids": set(results)}
def add(
self,
embeddings: List[List[float]],
documents: List[str],
metadatas: List[object],
ids: List[str],
skip_embedding: bool,
):
"""Add to database"""
if not skip_embedding:
embeddings = self.embedder.embedding_fn(documents)
for id, doc, metadata, embedding in zip(ids, documents, metadatas, embeddings):
data = {**metadata, "id": id, "text": doc, "embeddings": embedding}
self.client.insert(collection_name=self.config.collection_name, data=data)
self.collection.load()
self.collection.flush()
self.client.flush(self.config.collection_name)
def query(self, input_query: List[str], n_results: int, where: Dict[str, any], skip_embedding: bool) -> List[str]:
"""
Query contents from vector data base based on vector similarity
:param input_query: list of query string
:type input_query: List[str]
:param n_results: no of similar documents to fetch from database
:type n_results: int
:param where: to filter data
:type where: str
:raises InvalidDimensionException: Dimensions do not match.
:return: The content of the document that matched your query.
:rtype: List[str]
"""
if self.collection.is_empty:
return []
if not isinstance(where, str):
where = None
if skip_embedding:
query_vector = input_query
query_result = self.client.search(
collection_name=self.config.collection_name,
data=query_vector,
limit=n_results,
output_fields=["text"],
)
else:
input_query_vector = self.embedder.embedding_fn([input_query])
query_vector = input_query_vector[0]
query_result = self.client.search(
collection_name=self.config.collection_name,
data=[query_vector],
limit=n_results,
output_fields=["text"],
)
doc_list = []
for query in query_result:
doc_list.append(query[0]["entity"]["text"])
return doc_list
def count(self) -> int:
"""
Count number of documents/chunks embedded in the database.
:return: number of documents
:rtype: int
"""
return self.collection.num_entities
def reset(self, collection_names: List[str] = None):
"""
Resets the database. Deletes all embeddings irreversibly.
"""
if self.config.collection_name:
if collection_names:
for collection_name in collection_names:
if collection_name in self.client.list_collections():
self.client.drop_collection(collection_name=collection_name)
else:
self.client.drop_collection(collection_name=self.config.collection_name)
self._get_or_create_collection(self.config.collection_name)
def set_collection_name(self, name: str):
"""
Set the name of the collection. A collection is an isolated space for vectors.
:param name: Name of the collection.
:type name: str
"""
if not isinstance(name, str):
raise TypeError("Collection name must be a string")
self.config.collection_name = name