Files
caoxiaozhu 68f663f2f4 feat: 重构知识库系统,移除Hermes集成,增强RAG和同步功能
主要变更:
- 移除Hermes智能体及相关回调服务
- 新增知识库RAG、同步、调度、规范化和索引任务服务
- 重构orchestrator服务,增强运行时聊天功能
- 更新前端聊天、政策制度、设置等页面样式和逻辑
- 更新expense_claims和document_intelligence服务
- 删除llm_wiki相关服务和测试文件
- 更新docker-compose配置和启动脚本
2026-05-17 08:38:41 +00:00

1052 lines
41 KiB
Python

import asyncio
import configparser
import hashlib
import json
import os
import uuid
from dataclasses import dataclass
from typing import Any, List, final
import numpy as np
import pipmaster as pm
from ..base import BaseVectorStorage
from ..exceptions import DataMigrationError
from ..kg.shared_storage import get_data_init_lock
from ..utils import compute_mdhash_id, logger
if not pm.is_installed("qdrant-client"):
pm.install("qdrant-client")
from qdrant_client import QdrantClient, models # type: ignore
DEFAULT_WORKSPACE = "_"
WORKSPACE_ID_FIELD = "workspace_id"
ENTITY_PREFIX = "ent-"
CREATED_AT_FIELD = "created_at"
ID_FIELD = "id"
DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES = 16 * 1024 * 1024 # 16MB
DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH = 128
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
def compute_mdhash_id_for_qdrant(
content: str, prefix: str = "", style: str = "simple"
) -> str:
"""
Generate a UUID based on the content and support multiple formats.
:param content: The content used to generate the UUID.
:param style: The format of the UUID, optional values are "simple", "hyphenated", "urn".
:return: A UUID that meets the requirements of Qdrant.
"""
if not content:
raise ValueError("Content must not be empty.")
# Use the hash value of the content to create a UUID.
hashed_content = hashlib.sha256((prefix + content).encode("utf-8")).digest()
generated_uuid = uuid.UUID(bytes=hashed_content[:16], version=4)
# Return the UUID according to the specified format.
if style == "simple":
return generated_uuid.hex
elif style == "hyphenated":
return str(generated_uuid)
elif style == "urn":
return f"urn:uuid:{generated_uuid}"
else:
raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
def workspace_filter_condition(workspace: str) -> models.FieldCondition:
"""
Create a workspace filter condition for Qdrant queries.
"""
return models.FieldCondition(
key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
)
def _find_legacy_collection(
client: QdrantClient,
namespace: str,
workspace: str = None,
model_suffix: str = None,
) -> str | None:
"""
Find legacy collection with backward compatibility support.
This function tries multiple naming patterns to locate legacy collections
created by older versions of LightRAG:
1. lightrag_vdb_{namespace} - if model_suffix is provided (HIGHEST PRIORITY)
2. {workspace}_{namespace} or {namespace} - no matter if model_suffix is provided or not
3. lightrag_vdb_{namespace} - fall back value no matter if model_suffix is provided or not (LOWEST PRIORITY)
Args:
client: QdrantClient instance
namespace: Base namespace (e.g., "chunks", "entities")
workspace: Optional workspace identifier
model_suffix: Optional model suffix for new collection
Returns:
Collection name if found, None otherwise
"""
# Try multiple naming patterns for backward compatibility
# More specific names (with workspace) have higher priority
candidates = [
f"lightrag_vdb_{namespace}" if model_suffix else None,
f"{workspace}_{namespace}" if workspace else None,
f"lightrag_vdb_{namespace}",
namespace,
]
for candidate in candidates:
if candidate and client.collection_exists(candidate):
logger.info(
f"Qdrant: Found legacy collection '{candidate}' "
f"(namespace={namespace}, workspace={workspace or 'none'})"
)
return candidate
return None
@final
@dataclass
class QdrantVectorDBStorage(BaseVectorStorage):
def __init__(
self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
meta_fields=meta_fields or set(),
)
self.__post_init__()
@staticmethod
def setup_collection(
client: QdrantClient,
collection_name: str,
namespace: str,
workspace: str,
vectors_config: models.VectorParams,
hnsw_config: models.HnswConfigDiff,
model_suffix: str,
):
"""
Setup Qdrant collection with migration support from legacy collections.
Ensure final collection has workspace isolation index.
Check vector dimension compatibility before new collection creation.
Drop legacy collection if it exists and is empty.
Only migrate data from legacy collection to new collection when new collection first created and legacy collection is not empty.
Args:
client: QdrantClient instance
collection_name: Name of the final collection
namespace: Base namespace (e.g., "chunks", "entities")
workspace: Workspace identifier for data isolation
vectors_config: Vector configuration parameters for the collection
hnsw_config: HNSW index configuration diff for the collection
"""
if not namespace or not workspace:
raise ValueError("namespace and workspace must be provided")
workspace_count_filter = models.Filter(
must=[workspace_filter_condition(workspace)]
)
new_collection_exists = client.collection_exists(collection_name)
legacy_collection = _find_legacy_collection(
client, namespace, workspace, model_suffix
)
# Case 1: Only new collection exists or new collection is the same as legacy collection
# No data migration needed, and ensuring index is created then return
if (new_collection_exists and not legacy_collection) or (
collection_name == legacy_collection
):
# create_payload_index return without error if index already exists
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
new_workspace_count = client.count(
collection_name=collection_name,
count_filter=workspace_count_filter,
exact=True,
).count
# Skip data migration if new collection already has workspace data
if new_workspace_count == 0 and not (collection_name == legacy_collection):
logger.warning(
f"Qdrant: workspace data in collection '{collection_name}' is empty. "
f"Ensure it is caused by new workspace setup and not an unexpected embedding model change."
)
return
legacy_count = None
if not new_collection_exists:
# Check vector dimension compatibility before creating new collection
if legacy_collection:
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
if legacy_count > 0:
legacy_info = client.get_collection(legacy_collection)
legacy_dim = legacy_info.config.params.vectors.size
if vectors_config.size and legacy_dim != vectors_config.size:
logger.error(
f"Qdrant: Dimension mismatch detected! "
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
f"but new embedding model expects {vectors_config.size}d."
)
raise DataMigrationError(
f"Dimension mismatch between legacy collection '{legacy_collection}' "
f"and new collection. Expected {vectors_config.size}d but got {legacy_dim}d."
)
client.create_collection(
collection_name, vectors_config=vectors_config, hnsw_config=hnsw_config
)
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
if not legacy_collection:
logger.warning(
"Qdrant: Ensure this new collection creation is caused by new workspace setup and not an unexpected embedding model change."
)
# create_payload_index return without error if index already exists
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
# Case 2: Legacy collection exist
if legacy_collection:
# Only drop legacy collection if it's empty
if legacy_count is None:
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
if legacy_count == 0:
client.delete_collection(collection_name=legacy_collection)
logger.info(
f"Qdrant: Empty legacy collection '{legacy_collection}' deleted successfully"
)
return
new_workspace_count = client.count(
collection_name=collection_name,
count_filter=workspace_count_filter,
exact=True,
).count
# Skip data migration if new collection already has workspace data
if new_workspace_count > 0:
logger.warning(
f"Qdrant: Both new and legacy collection have data. "
f"{legacy_count} records in {legacy_collection} require manual deletion after migration verification."
)
return
# Case 3: Only legacy exists - migrate data from legacy collection to new collection
# Check if legacy collection has workspace_id to determine migration strategy
# Note: payload_schema only reflects INDEXED fields, so we also sample
# actual payloads to detect unindexed workspace_id fields
legacy_info = client.get_collection(legacy_collection)
has_workspace_index = WORKSPACE_ID_FIELD in (
legacy_info.payload_schema or {}
)
# Detect workspace_id field presence by sampling payloads if not indexed
# This prevents cross-workspace data leakage when workspace_id exists but isn't indexed
has_workspace_field = has_workspace_index
if not has_workspace_index:
# Sample a small batch of points to check for workspace_id in payloads
# All points must have workspace_id if any point has it
sample_result = client.scroll(
collection_name=legacy_collection,
limit=10, # Small sample is sufficient for detection
with_payload=True,
with_vectors=False,
)
sample_points, _ = sample_result
for point in sample_points:
if point.payload and WORKSPACE_ID_FIELD in point.payload:
has_workspace_field = True
logger.info(
f"Qdrant: Detected unindexed {WORKSPACE_ID_FIELD} field "
f"in legacy collection '{legacy_collection}' via payload sampling"
)
break
# Build workspace filter if legacy collection has workspace support
# This prevents cross-workspace data leakage during migration
legacy_scroll_filter = None
if has_workspace_field:
legacy_scroll_filter = models.Filter(
must=[workspace_filter_condition(workspace)]
)
# Recount with workspace filter for accurate migration tracking
legacy_count = client.count(
collection_name=legacy_collection,
count_filter=legacy_scroll_filter,
exact=True,
).count
logger.info(
f"Qdrant: Legacy collection has workspace support, "
f"filtering to {legacy_count} records for workspace '{workspace}'"
)
logger.info(
f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records to migrate."
)
logger.info(
f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
)
try:
# Batch migration (500 records per batch)
migrated_count = 0
offset = None
batch_size = 500
while True:
# Scroll through legacy data with optional workspace filter
result = client.scroll(
collection_name=legacy_collection,
scroll_filter=legacy_scroll_filter,
limit=batch_size,
offset=offset,
with_vectors=True,
with_payload=True,
)
points, next_offset = result
if not points:
break
# Transform points for new collection
new_points = []
for point in points:
# Set workspace_id in payload
new_payload = dict(point.payload or {})
new_payload[WORKSPACE_ID_FIELD] = workspace
# Create new point with workspace-prefixed ID
original_id = new_payload.get(ID_FIELD)
if original_id:
new_point_id = compute_mdhash_id_for_qdrant(
original_id, prefix=workspace
)
else:
# Fallback: use original point ID
new_point_id = str(point.id)
new_points.append(
models.PointStruct(
id=new_point_id,
vector=point.vector,
payload=new_payload,
)
)
# Upsert to new collection
client.upsert(
collection_name=collection_name, points=new_points, wait=True
)
migrated_count += len(points)
logger.info(
f"Qdrant: {migrated_count}/{legacy_count} records migrated"
)
# Check if we've reached the end
if next_offset is None:
break
offset = next_offset
new_count_after = client.count(
collection_name=collection_name,
count_filter=workspace_count_filter,
exact=True,
).count
inserted_count = new_count_after - new_workspace_count
if inserted_count != legacy_count:
error_msg = (
"Qdrant: Migration verification failed, expected "
f"{legacy_count} inserted records, got {inserted_count}."
)
logger.error(error_msg)
raise DataMigrationError(error_msg)
except DataMigrationError:
# Re-raise DataMigrationError as-is to preserve specific error messages
raise
except Exception as e:
logger.error(
f"Qdrant: Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}': {e}"
)
raise DataMigrationError(
f"Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
) from e
logger.info(
f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
)
logger.warning(
"Qdrant: Manual deletion is required after data migration verification."
)
def __post_init__(self):
self._validate_embedding_func()
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Qdrant storage instances
qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
if qdrant_workspace and qdrant_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = qdrant_workspace.strip()
logger.info(
f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding '{self.workspace}/{self.namespace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
# Generate model suffix
self.model_suffix = self._generate_collection_suffix()
# New naming scheme with model isolation
# Example: "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
# Ensure model_suffix is not empty before appending
if self.model_suffix:
self.final_namespace = f"lightrag_vdb_{self.namespace}_{self.model_suffix}"
logger.info(f"Qdrant collection: {self.final_namespace}")
else:
# Fallback: use legacy namespace if model_suffix is unavailable
self.final_namespace = f"lightrag_vdb_{self.namespace}"
logger.warning(
f"Qdrant collection: {self.final_namespace} missing suffix. Pls add model_name to embedding_func for proper workspace data isolation."
)
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
raise ValueError(
"cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
)
self.cosine_better_than_threshold = cosine_threshold
# Initialize client as None - will be created in initialize() method
self._client = None
self._max_batch_size = self.global_config["embedding_batch_num"]
self._max_upsert_payload_bytes = int(
os.getenv(
"QDRANT_UPSERT_MAX_PAYLOAD_BYTES",
str(DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES),
)
)
self._max_upsert_points_per_batch = int(
os.getenv(
"QDRANT_UPSERT_MAX_POINTS_PER_BATCH",
str(DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH),
)
)
if self._max_upsert_payload_bytes <= 0:
logger.warning(
f"QDRANT_UPSERT_MAX_PAYLOAD_BYTES={self._max_upsert_payload_bytes} is non-positive, disable payload-size splitting"
)
if self._max_upsert_points_per_batch <= 0:
logger.warning(
f"QDRANT_UPSERT_MAX_POINTS_PER_BATCH={self._max_upsert_points_per_batch} is non-positive, disable point-count splitting"
)
self._initialized = False
@staticmethod
def _to_json_serializable(value: Any) -> Any:
"""Convert nested values to JSON-serializable types for payload size estimation."""
if isinstance(value, np.ndarray):
return value.tolist()
if isinstance(value, np.integer):
return int(value)
if isinstance(value, np.floating):
return float(value)
if isinstance(value, dict):
return {
str(k): QdrantVectorDBStorage._to_json_serializable(v)
for k, v in value.items()
}
if isinstance(value, (list, tuple)):
return [QdrantVectorDBStorage._to_json_serializable(v) for v in value]
return value
@staticmethod
def _estimate_point_payload_bytes(point: models.PointStruct) -> int:
"""Estimate serialized JSON byte size of a single Qdrant point."""
point_obj = {
"id": point.id,
"vector": QdrantVectorDBStorage._to_json_serializable(point.vector),
"payload": QdrantVectorDBStorage._to_json_serializable(point.payload or {}),
}
return len(
json.dumps(
point_obj,
ensure_ascii=False,
separators=(",", ":"),
).encode("utf-8")
)
@staticmethod
def _build_upsert_batches(
points: list[models.PointStruct],
max_payload_bytes: int,
max_points_per_batch: int,
) -> list[tuple[list[models.PointStruct], int]]:
"""Split points into batches using payload size and point count limits."""
if not points:
return []
payload_limit = max_payload_bytes if max_payload_bytes > 0 else float("inf")
points_limit = (
max_points_per_batch if max_points_per_batch > 0 else float("inf")
)
batches: list[tuple[list[models.PointStruct], int]] = []
current_batch: list[models.PointStruct] = []
# JSON array overhead ("[]")
current_estimated_bytes = 2
for point in points:
point_size = QdrantVectorDBStorage._estimate_point_payload_bytes(point)
point_with_array_overhead = point_size + 2
point_id = str(point.id)
if point_with_array_overhead > payload_limit:
raise ValueError(
f"Single Qdrant point exceeds payload limit: id={point_id}, "
f"estimated_bytes={point_with_array_overhead}, "
f"limit={int(payload_limit)}"
)
# If current batch not empty, a comma is needed before next element.
separator_overhead = 1 if current_batch else 0
next_batch_size = current_estimated_bytes + separator_overhead + point_size
if current_batch and (
len(current_batch) >= points_limit or next_batch_size > payload_limit
):
batches.append((current_batch, current_estimated_bytes))
current_batch = []
current_estimated_bytes = 2
next_batch_size = current_estimated_bytes + point_size
current_batch.append(point)
current_estimated_bytes = next_batch_size
if current_batch:
batches.append((current_batch, current_estimated_bytes))
return batches
async def initialize(self):
"""Initialize Qdrant collection"""
async with get_data_init_lock():
if self._initialized:
return
try:
# Create QdrantClient if not already created
if self._client is None:
self._client = QdrantClient(
url=os.environ.get(
"QDRANT_URL", config.get("qdrant", "uri", fallback=None)
),
api_key=os.environ.get(
"QDRANT_API_KEY",
config.get("qdrant", "apikey", fallback=None),
),
)
logger.debug(
f"[{self.workspace}] QdrantClient created successfully"
)
# Setup collection (create if not exists and configure indexes)
# Pass namespace and workspace for backward-compatible migration support
QdrantVectorDBStorage.setup_collection(
self._client,
self.final_namespace,
namespace=self.namespace,
workspace=self.effective_workspace,
vectors_config=models.VectorParams(
size=self.embedding_func.embedding_dim,
distance=models.Distance.COSINE,
),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
model_suffix=self.model_suffix,
)
# Removed duplicate max batch size initialization
self._initialized = True
logger.info(
f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
)
except Exception as e:
logger.error(
f"[{self.workspace}] Failed to initialize Qdrant collection '{self.namespace}': {e}"
)
raise
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
if not data:
return
import time
current_time = int(time.time())
list_data = [
{
ID_FIELD: k,
WORKSPACE_ID_FIELD: self.effective_workspace,
CREATED_AT_FIELD: current_time,
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
}
for k, v in data.items()
]
contents = [v["content"] for v in data.values()]
batches = [
contents[i : i + self._max_batch_size]
for i in range(0, len(contents), self._max_batch_size)
]
embedding_tasks = [
self.embedding_func(batch, context="document") for batch in batches
]
embeddings_list = await asyncio.gather(*embedding_tasks)
embeddings = np.concatenate(embeddings_list)
list_points = []
for i, d in enumerate(list_data):
list_points.append(
models.PointStruct(
id=compute_mdhash_id_for_qdrant(
d[ID_FIELD], prefix=self.effective_workspace
),
vector=embeddings[i],
payload=d,
)
)
point_batches = self._build_upsert_batches(
list_points,
max_payload_bytes=self._max_upsert_payload_bytes,
max_points_per_batch=self._max_upsert_points_per_batch,
)
if len(point_batches) > 1:
logger.info(
f"[{self.workspace}] Qdrant upsert split into {len(point_batches)} batches "
f"for {len(list_points)} points (max_payload_bytes={self._max_upsert_payload_bytes}, "
f"max_points_per_batch={self._max_upsert_points_per_batch})"
)
results = None
for batch_index, (points_batch, estimated_bytes) in enumerate(point_batches, 1):
logger.debug(
f"[{self.workspace}] Qdrant upsert batch {batch_index}/{len(point_batches)}: "
f"points={len(points_batch)}, estimated_payload_bytes={estimated_bytes}"
)
# Fail-fast: any batch failure raises immediately and stops subsequent batches.
results = self._client.upsert(
collection_name=self.final_namespace,
points=points_batch,
wait=True,
)
return results
async def query(
self, query: str, top_k: int, query_embedding: list[float] = None
) -> list[dict[str, Any]]:
if query_embedding is not None:
embedding = query_embedding
else:
embedding_result = await self.embedding_func(
[query], context="query", _priority=5
) # higher priority for query
embedding = embedding_result[0]
results = self._client.query_points(
collection_name=self.final_namespace,
query=embedding,
limit=top_k,
with_payload=True,
score_threshold=self.cosine_better_than_threshold,
query_filter=models.Filter(
must=[workspace_filter_condition(self.effective_workspace)]
),
).points
return [
{
**dp.payload,
"distance": dp.score,
CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
}
for dp in results
]
async def index_done_callback(self) -> None:
# Qdrant handles persistence automatically
pass
async def delete(self, ids: List[str]) -> None:
"""Delete vectors with specified IDs
Args:
ids: List of vector IDs to be deleted
"""
try:
if not ids:
return
# Convert regular ids to Qdrant compatible ids
qdrant_ids = [
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Delete points from the collection with workspace filtering
self._client.delete(
collection_name=self.final_namespace,
points_selector=models.PointIdsList(points=qdrant_ids),
wait=True,
)
logger.debug(
f"[{self.workspace}] Successfully deleted {len(ids)} vectors from {self.namespace}"
)
except Exception as e:
logger.error(
f"[{self.workspace}] Error while deleting vectors from {self.namespace}: {e}"
)
async def delete_entity(self, entity_name: str) -> None:
"""Delete an entity by name
Args:
entity_name: Name of the entity to delete
"""
try:
# Compute entity ID from name (same as Milvus)
entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
logger.debug(
f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}"
)
# Scroll to find the entity by its ID field in payload with workspace filtering
# This is safer than reconstructing the Qdrant point ID
results = self._client.scroll(
collection_name=self.final_namespace,
scroll_filter=models.Filter(
must=[
workspace_filter_condition(self.effective_workspace),
models.FieldCondition(
key=ID_FIELD, match=models.MatchValue(value=entity_id)
),
]
),
with_payload=False,
limit=1,
)
# Extract point IDs to delete
points = results[0]
if points:
ids_to_delete = [point.id for point in points]
self._client.delete(
collection_name=self.final_namespace,
points_selector=models.PointIdsList(points=ids_to_delete),
wait=True,
)
logger.debug(
f"[{self.workspace}] Successfully deleted entity {entity_name}"
)
else:
logger.debug(
f"[{self.workspace}] Entity {entity_name} not found in storage"
)
except Exception as e:
logger.error(f"[{self.workspace}] Error deleting entity {entity_name}: {e}")
async def delete_entity_relation(self, entity_name: str) -> None:
"""Delete all relations associated with an entity
Args:
entity_name: Name of the entity whose relations should be deleted
"""
try:
# Build the filter to find relations where entity is either source or target
# must + should = workspace_id matches AND (src_id matches OR tgt_id matches)
relation_filter = models.Filter(
must=[workspace_filter_condition(self.effective_workspace)],
should=[
models.FieldCondition(
key="src_id", match=models.MatchValue(value=entity_name)
),
models.FieldCondition(
key="tgt_id", match=models.MatchValue(value=entity_name)
),
],
)
# Paginate through all matching relations to handle large datasets
total_deleted = 0
offset = None
batch_size = 1000
while True:
# Scroll to find relations, using with_payload=False for efficiency
# since we only need point IDs for deletion
results = self._client.scroll(
collection_name=self.final_namespace,
scroll_filter=relation_filter,
with_payload=False,
with_vectors=False,
limit=batch_size,
offset=offset,
)
points, next_offset = results
if not points:
break
# Extract point IDs to delete
ids_to_delete = [point.id for point in points]
# Delete the batch of relations
self._client.delete(
collection_name=self.final_namespace,
points_selector=models.PointIdsList(points=ids_to_delete),
wait=True,
)
total_deleted += len(ids_to_delete)
# Check if we've reached the end
if next_offset is None:
break
offset = next_offset
if total_deleted > 0:
logger.debug(
f"[{self.workspace}] Deleted {total_deleted} relations for {entity_name}"
)
else:
logger.debug(
f"[{self.workspace}] No relations found for entity {entity_name}"
)
except Exception as e:
logger.error(
f"[{self.workspace}] Error deleting relations for {entity_name}: {e}"
)
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
Args:
id: The unique identifier of the vector
Returns:
The vector data if found, or None if not found
"""
try:
# Convert to Qdrant compatible ID
qdrant_id = compute_mdhash_id_for_qdrant(
id, prefix=self.effective_workspace
)
# Retrieve the point by ID with workspace filtering
result = self._client.retrieve(
collection_name=self.final_namespace,
ids=[qdrant_id],
with_payload=True,
)
if not result:
return None
payload = result[0].payload
if CREATED_AT_FIELD not in payload:
payload[CREATED_AT_FIELD] = None
return payload
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving vector data for ID {id}: {e}"
)
return None
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
"""Get multiple vector data by their IDs
Args:
ids: List of unique identifiers
Returns:
List of vector data objects that were found
"""
if not ids:
return []
try:
# Convert to Qdrant compatible IDs
qdrant_ids = [
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Retrieve the points by IDs
results = self._client.retrieve(
collection_name=self.final_namespace,
ids=qdrant_ids,
with_payload=True,
)
# Ensure each result contains created_at field and preserve caller ordering
payload_by_original_id: dict[str, dict[str, Any]] = {}
payload_by_qdrant_id: dict[str, dict[str, Any]] = {}
for point in results:
payload = dict(point.payload or {})
if CREATED_AT_FIELD not in payload:
payload[CREATED_AT_FIELD] = None
qdrant_point_id = str(point.id) if point.id is not None else ""
if qdrant_point_id:
payload_by_qdrant_id[qdrant_point_id] = payload
original_id = payload.get(ID_FIELD)
if original_id is not None:
payload_by_original_id[str(original_id)] = payload
ordered_payloads: list[dict[str, Any] | None] = []
for requested_id, qdrant_id in zip(ids, qdrant_ids):
payload = payload_by_original_id.get(str(requested_id))
if payload is None:
payload = payload_by_qdrant_id.get(str(qdrant_id))
ordered_payloads.append(payload)
return ordered_payloads
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}"
)
return []
async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
"""Get vectors by their IDs, returning only ID and vector data for efficiency
Args:
ids: List of unique identifiers
Returns:
Dictionary mapping IDs to their vector embeddings
Format: {id: [vector_values], ...}
"""
if not ids:
return {}
try:
# Convert to Qdrant compatible IDs
qdrant_ids = [
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Retrieve the points by IDs with vectors
results = self._client.retrieve(
collection_name=self.final_namespace,
ids=qdrant_ids,
with_vectors=True, # Important: request vectors
with_payload=True,
)
vectors_dict = {}
for point in results:
if point and point.vector is not None and point.payload:
# Get original ID from payload
original_id = point.payload.get(ID_FIELD)
if original_id:
# Convert numpy array to list if needed
vector_data = point.vector
if isinstance(vector_data, np.ndarray):
vector_data = vector_data.tolist()
vectors_dict[original_id] = vector_data
return vectors_dict
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving vectors by IDs from {self.namespace}: {e}"
)
return {}
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will delete all data for the current workspace from the Qdrant collection.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
# No need to lock: data integrity is ensured by allowing only one process to hold pipeline at a time
try:
# Delete all points for the current workspace
self._client.delete(
collection_name=self.final_namespace,
points_selector=models.FilterSelector(
filter=models.Filter(
must=[workspace_filter_condition(self.effective_workspace)]
)
),
wait=True,
)
logger.info(
f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
)
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
)
return {"status": "error", "message": str(e)}