Fix/es query filter (🚨 URGENT) (#2162)
This commit is contained in:
@@ -49,18 +49,28 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
def create_index(self) -> None:
|
def create_index(self) -> None:
|
||||||
"""Create Elasticsearch index with proper mappings if it doesn't exist"""
|
"""Create Elasticsearch index with proper mappings if it doesn't exist"""
|
||||||
index_settings = {
|
index_settings = {
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"number_of_shards": 5,
|
||||||
|
"refresh_interval": "1s"
|
||||||
|
}
|
||||||
|
},
|
||||||
"mappings": {
|
"mappings": {
|
||||||
"properties": {
|
"properties": {
|
||||||
"text": {"type": "text"},
|
"text": {"type": "text"},
|
||||||
"embedding": {
|
"vector": {
|
||||||
"type": "dense_vector",
|
"type": "dense_vector",
|
||||||
"dims": self.vector_dim,
|
"dims": self.vector_dim,
|
||||||
"index": True,
|
"index": True,
|
||||||
"similarity": "cosine",
|
"similarity": "cosine"
|
||||||
},
|
},
|
||||||
"metadata": {"type": "object"},
|
"metadata": {
|
||||||
"user_id": {"type": "keyword"},
|
"type": "object",
|
||||||
"hash": {"type": "keyword"},
|
"properties": {
|
||||||
|
"user_id": {"type": "keyword"}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -99,43 +109,76 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
|
|
||||||
actions = []
|
actions = []
|
||||||
for i, (vec, id_) in enumerate(zip(vectors, ids)):
|
for i, (vec, id_) in enumerate(zip(vectors, ids)):
|
||||||
action = {"_index": self.collection_name, "_id": id_, "vector": vec, "payload": payloads[i]}
|
action = {
|
||||||
|
"_index": self.collection_name,
|
||||||
|
"_id": id_,
|
||||||
|
"_source": {
|
||||||
|
"vector": vec,
|
||||||
|
"metadata": payloads[i] # Store all metadata in the metadata field
|
||||||
|
}
|
||||||
|
}
|
||||||
actions.append(action)
|
actions.append(action)
|
||||||
|
|
||||||
bulk(self.client, actions)
|
bulk(self.client, actions)
|
||||||
|
|
||||||
# Return OutputData objects for inserted documents
|
|
||||||
results = []
|
results = []
|
||||||
for i, id_ in enumerate(ids):
|
for i, id_ in enumerate(ids):
|
||||||
results.append(
|
results.append(
|
||||||
OutputData(
|
OutputData(
|
||||||
id=id_,
|
id=id_,
|
||||||
score=1.0, # Default score for inserts
|
score=1.0, # Default score for inserts
|
||||||
payload=payloads[i],
|
payload=payloads[i]
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def search(self, query: List[float], limit: int = 5, filters: Optional[Dict] = None) -> List[OutputData]:
|
def search(self, query: List[float], limit: int = 5, filters: Optional[Dict] = None) -> List[OutputData]:
|
||||||
"""Search for similar vectors using KNN search with pre-filtering."""
|
"""Search for similar vectors using KNN search with pre-filtering."""
|
||||||
search_query = {
|
if not filters:
|
||||||
"query": {
|
# If no filters, just do KNN search
|
||||||
"bool": {
|
search_query = {
|
||||||
"must": [
|
"knn": {
|
||||||
# Exact match filters for memory isolation
|
"field": "vector",
|
||||||
*({"term": {f"payload.{k}": v}} for k, v in (filters or {}).items()),
|
"query_vector": query,
|
||||||
# KNN vector search
|
"k": limit,
|
||||||
{"knn": {"vector": {"vector": query, "k": limit}}},
|
"num_candidates": limit * 2
|
||||||
]
|
}
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# If filters exist, apply them with KNN search
|
||||||
|
filter_conditions = []
|
||||||
|
for key, value in filters.items():
|
||||||
|
filter_conditions.append({
|
||||||
|
"term": {
|
||||||
|
f"metadata.{key}": value
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
search_query = {
|
||||||
|
"knn": {
|
||||||
|
"field": "vector",
|
||||||
|
"query_vector": query,
|
||||||
|
"k": limit,
|
||||||
|
"num_candidates": limit * 2,
|
||||||
|
"filter": {
|
||||||
|
"bool": {
|
||||||
|
"must": filter_conditions
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
response = self.client.search(index=self.collection_name, body=search_query)
|
response = self.client.search(index=self.collection_name, body=search_query)
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
for hit in response["hits"]["hits"]:
|
for hit in response["hits"]["hits"]:
|
||||||
results.append(OutputData(id=hit["_id"], score=hit["_score"], payload=hit["_source"].get("payload", {})))
|
results.append(
|
||||||
|
OutputData(
|
||||||
|
id=hit["_id"],
|
||||||
|
score=hit["_score"],
|
||||||
|
payload=hit.get("_source", {}).get("metadata", {})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@@ -149,7 +192,7 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
if vector is not None:
|
if vector is not None:
|
||||||
doc["vector"] = vector
|
doc["vector"] = vector
|
||||||
if payload is not None:
|
if payload is not None:
|
||||||
doc["payload"] = payload
|
doc["metadata"] = payload
|
||||||
|
|
||||||
self.client.update(index=self.collection_name, id=vector_id, body={"doc": doc})
|
self.client.update(index=self.collection_name, id=vector_id, body={"doc": doc})
|
||||||
|
|
||||||
@@ -160,7 +203,7 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
return OutputData(
|
return OutputData(
|
||||||
id=response["_id"],
|
id=response["_id"],
|
||||||
score=1.0, # Default score for direct get
|
score=1.0, # Default score for direct get
|
||||||
payload=response["_source"].get("payload", {}),
|
payload=response["_source"].get("metadata", {})
|
||||||
)
|
)
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logger.warning(f"Missing key in Elasticsearch response: {e}")
|
logger.warning(f"Missing key in Elasticsearch response: {e}")
|
||||||
@@ -189,7 +232,18 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
query: Dict[str, Any] = {"query": {"match_all": {}}}
|
query: Dict[str, Any] = {"query": {"match_all": {}}}
|
||||||
|
|
||||||
if filters:
|
if filters:
|
||||||
query["query"] = {"bool": {"must": [{"match": {f"payload.{k}": v}} for k, v in filters.items()]}}
|
filter_conditions = []
|
||||||
|
for key, value in filters.items():
|
||||||
|
filter_conditions.append({
|
||||||
|
"term": {
|
||||||
|
f"metadata.{key}": value
|
||||||
|
}
|
||||||
|
})
|
||||||
|
query["query"] = {
|
||||||
|
"bool": {
|
||||||
|
"must": filter_conditions
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if limit:
|
if limit:
|
||||||
query["size"] = limit
|
query["size"] = limit
|
||||||
@@ -202,7 +256,7 @@ class ElasticsearchDB(VectorStoreBase):
|
|||||||
OutputData(
|
OutputData(
|
||||||
id=hit["_id"],
|
id=hit["_id"],
|
||||||
score=1.0, # Default score for list operation
|
score=1.0, # Default score for list operation
|
||||||
payload=hit["_source"].get("payload", {}),
|
payload=hit.get("_source", {}).get("metadata", {})
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -92,13 +92,11 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
# Verify field mappings
|
# Verify field mappings
|
||||||
mappings = create_args["body"]["mappings"]["properties"]
|
mappings = create_args["body"]["mappings"]["properties"]
|
||||||
self.assertEqual(mappings["text"]["type"], "text")
|
self.assertEqual(mappings["text"]["type"], "text")
|
||||||
self.assertEqual(mappings["embedding"]["type"], "dense_vector")
|
self.assertEqual(mappings["vector"]["type"], "dense_vector")
|
||||||
self.assertEqual(mappings["embedding"]["dims"], 1536)
|
self.assertEqual(mappings["vector"]["dims"], 1536)
|
||||||
self.assertEqual(mappings["embedding"]["index"], True)
|
self.assertEqual(mappings["vector"]["index"], True)
|
||||||
self.assertEqual(mappings["embedding"]["similarity"], "cosine")
|
self.assertEqual(mappings["vector"]["similarity"], "cosine")
|
||||||
self.assertEqual(mappings["metadata"]["type"], "object")
|
self.assertEqual(mappings["metadata"]["type"], "object")
|
||||||
self.assertEqual(mappings["user_id"]["type"], "keyword")
|
|
||||||
self.assertEqual(mappings["hash"]["type"], "keyword")
|
|
||||||
|
|
||||||
# Reset mocks for next test
|
# Reset mocks for next test
|
||||||
self.client_mock.reset_mock()
|
self.client_mock.reset_mock()
|
||||||
@@ -170,8 +168,8 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
self.assertEqual(len(actions), 2)
|
self.assertEqual(len(actions), 2)
|
||||||
self.assertEqual(actions[0]["_index"], "test_collection")
|
self.assertEqual(actions[0]["_index"], "test_collection")
|
||||||
self.assertEqual(actions[0]["_id"], "id1")
|
self.assertEqual(actions[0]["_id"], "id1")
|
||||||
self.assertEqual(actions[0]["vector"], vectors[0])
|
self.assertEqual(actions[0]["_source"]["vector"], vectors[0])
|
||||||
self.assertEqual(actions[0]["payload"], payloads[0])
|
self.assertEqual(actions[0]["_source"]["metadata"], payloads[0])
|
||||||
|
|
||||||
# Verify returned objects
|
# Verify returned objects
|
||||||
self.assertEqual(len(results), 2)
|
self.assertEqual(len(results), 2)
|
||||||
@@ -189,7 +187,7 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
"_score": 0.8,
|
"_score": 0.8,
|
||||||
"_source": {
|
"_source": {
|
||||||
"vector": [0.1] * 1536,
|
"vector": [0.1] * 1536,
|
||||||
"payload": {"key1": "value1"}
|
"metadata": {"key1": "value1"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@@ -210,14 +208,11 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
body = search_args["body"]
|
body = search_args["body"]
|
||||||
|
|
||||||
# Verify KNN query structure
|
# Verify KNN query structure
|
||||||
self.assertIn("query", body)
|
self.assertIn("knn", body)
|
||||||
self.assertIn("bool", body["query"])
|
self.assertEqual(body["knn"]["field"], "vector")
|
||||||
self.assertIn("must", body["query"]["bool"])
|
self.assertEqual(body["knn"]["query_vector"], query_vector)
|
||||||
|
self.assertEqual(body["knn"]["k"], 5)
|
||||||
# Verify KNN parameters
|
self.assertEqual(body["knn"]["num_candidates"], 10)
|
||||||
knn_query = body["query"]["bool"]["must"][-1]["knn"]["vector"]
|
|
||||||
self.assertEqual(knn_query["vector"], query_vector)
|
|
||||||
self.assertEqual(knn_query["k"], 5)
|
|
||||||
|
|
||||||
# Verify results
|
# Verify results
|
||||||
self.assertEqual(len(results), 1)
|
self.assertEqual(len(results), 1)
|
||||||
@@ -231,10 +226,8 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
"_id": "id1",
|
"_id": "id1",
|
||||||
"_source": {
|
"_source": {
|
||||||
"vector": [0.1] * 1536,
|
"vector": [0.1] * 1536,
|
||||||
"payload": {"key": "value"},
|
"metadata": {"key": "value"},
|
||||||
"text": "sample text",
|
"text": "sample text"
|
||||||
"user_id": "test_user",
|
|
||||||
"hash": "sample_hash"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.client_mock.get.return_value = mock_response
|
self.client_mock.get.return_value = mock_response
|
||||||
@@ -248,17 +241,11 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
id="id1"
|
id="id1"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Basic assertions that should pass if OutputData is created correctly
|
# Verify result
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertTrue(hasattr(result, 'id'))
|
self.assertEqual(result.id, "id1")
|
||||||
self.assertTrue(hasattr(result, 'score'))
|
self.assertEqual(result.score, 1.0)
|
||||||
self.assertTrue(hasattr(result, 'payload'))
|
self.assertEqual(result.payload, {"key": "value"})
|
||||||
|
|
||||||
# If the above assertions pass, we can safely check the values
|
|
||||||
if result is not None: # This satisfies the linter
|
|
||||||
self.assertEqual(result.id, "id1")
|
|
||||||
self.assertEqual(result.score, 1.0)
|
|
||||||
self.assertEqual(result.payload, {"key": "value"})
|
|
||||||
|
|
||||||
def test_get_not_found(self):
|
def test_get_not_found(self):
|
||||||
# Mock get raising exception
|
# Mock get raising exception
|
||||||
@@ -277,7 +264,7 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
"_id": "id1",
|
"_id": "id1",
|
||||||
"_source": {
|
"_source": {
|
||||||
"vector": [0.1] * 1536,
|
"vector": [0.1] * 1536,
|
||||||
"payload": {"key1": "value1"}
|
"metadata": {"key1": "value1"}
|
||||||
},
|
},
|
||||||
"_score": 1.0
|
"_score": 1.0
|
||||||
},
|
},
|
||||||
@@ -285,7 +272,7 @@ class TestElasticsearchDB(unittest.TestCase):
|
|||||||
"_id": "id2",
|
"_id": "id2",
|
||||||
"_source": {
|
"_source": {
|
||||||
"vector": [0.2] * 1536,
|
"vector": [0.2] * 1536,
|
||||||
"payload": {"key2": "value2"}
|
"metadata": {"key2": "value2"}
|
||||||
},
|
},
|
||||||
"_score": 0.8
|
"_score": 0.8
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user