Support Custom Search Query for Elasticsearch (#2372)
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from typing import Any, Dict, Optional
|
||||
from collections.abc import Callable
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
@@ -15,6 +16,10 @@ class ElasticsearchConfig(BaseModel):
|
||||
verify_certs: bool = Field(True, description="Verify SSL certificates")
|
||||
use_ssl: bool = Field(True, description="Use SSL for connection")
|
||||
auto_create_index: bool = Field(True, description="Automatically create index during initialization")
|
||||
custom_search_query: Optional[Callable[[List[float], int, Optional[Dict]], Dict]] = Field(
|
||||
None,
|
||||
description="Custom search query function. Parameters: (query, limit, filters) -> Dict"
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
|
||||
@@ -45,6 +45,11 @@ class ElasticsearchDB(VectorStoreBase):
|
||||
# Create index only if auto_create_index is True
|
||||
if config.auto_create_index:
|
||||
self.create_index()
|
||||
|
||||
if config.custom_search_query:
|
||||
self.custom_search_query = config.custom_search_query
|
||||
else:
|
||||
self.custom_search_query = None
|
||||
|
||||
def create_index(self) -> None:
|
||||
"""Create Elasticsearch index with proper mappings if it doesn't exist"""
|
||||
@@ -117,25 +122,20 @@ class ElasticsearchDB(VectorStoreBase):
|
||||
return results
|
||||
|
||||
def search(self, query: List[float], limit: int = 5, filters: Optional[Dict] = None) -> List[OutputData]:
|
||||
"""Search for similar vectors using KNN search with pre-filtering."""
|
||||
if not filters:
|
||||
# If no filters, just do KNN search
|
||||
search_query = {"knn": {"field": "vector", "query_vector": query, "k": limit, "num_candidates": limit * 2}}
|
||||
"""
|
||||
Search with two options:
|
||||
1. Use custom search query if provided
|
||||
2. Use KNN search on vectors with pre-filtering if no custom search query is provided
|
||||
"""
|
||||
if self.custom_search_query:
|
||||
search_query = self.custom_search_query(query, limit, filters)
|
||||
else:
|
||||
# If filters exist, apply them with KNN search
|
||||
filter_conditions = []
|
||||
for key, value in filters.items():
|
||||
filter_conditions.append({"term": {f"metadata.{key}": value}})
|
||||
|
||||
search_query = {
|
||||
"knn": {
|
||||
"field": "vector",
|
||||
"query_vector": query,
|
||||
"k": limit,
|
||||
"num_candidates": limit * 2,
|
||||
"filter": {"bool": {"must": filter_conditions}},
|
||||
}
|
||||
}
|
||||
search_query = {"knn": {"field": "vector", "query_vector": query, "k": limit, "num_candidates": limit * 2}}
|
||||
if filters:
|
||||
filter_conditions = []
|
||||
for key, value in filters.items():
|
||||
filter_conditions.append({"term": {f"metadata.{key}": value}})
|
||||
search_query["filter"] = {"bool": {"must": filter_conditions}}
|
||||
|
||||
response = self.client.search(index=self.collection_name, body=search_query)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user