主要变更: - 移除Hermes智能体及相关回调服务 - 新增知识库RAG、同步、调度、规范化和索引任务服务 - 重构orchestrator服务,增强运行时聊天功能 - 更新前端聊天、政策制度、设置等页面样式和逻辑 - 更新expense_claims和document_intelligence服务 - 删除llm_wiki相关服务和测试文件 - 更新docker-compose配置和启动脚本
1052 lines
41 KiB
Python
1052 lines
41 KiB
Python
import asyncio
|
|
import configparser
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any, List, final
|
|
|
|
import numpy as np
|
|
import pipmaster as pm
|
|
|
|
from ..base import BaseVectorStorage
|
|
from ..exceptions import DataMigrationError
|
|
from ..kg.shared_storage import get_data_init_lock
|
|
from ..utils import compute_mdhash_id, logger
|
|
|
|
if not pm.is_installed("qdrant-client"):
|
|
pm.install("qdrant-client")
|
|
|
|
from qdrant_client import QdrantClient, models # type: ignore
|
|
|
|
DEFAULT_WORKSPACE = "_"
|
|
WORKSPACE_ID_FIELD = "workspace_id"
|
|
ENTITY_PREFIX = "ent-"
|
|
CREATED_AT_FIELD = "created_at"
|
|
ID_FIELD = "id"
|
|
DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES = 16 * 1024 * 1024 # 16MB
|
|
DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH = 128
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read("config.ini", "utf-8")
|
|
|
|
|
|
def compute_mdhash_id_for_qdrant(
|
|
content: str, prefix: str = "", style: str = "simple"
|
|
) -> str:
|
|
"""
|
|
Generate a UUID based on the content and support multiple formats.
|
|
|
|
:param content: The content used to generate the UUID.
|
|
:param style: The format of the UUID, optional values are "simple", "hyphenated", "urn".
|
|
:return: A UUID that meets the requirements of Qdrant.
|
|
"""
|
|
if not content:
|
|
raise ValueError("Content must not be empty.")
|
|
|
|
# Use the hash value of the content to create a UUID.
|
|
hashed_content = hashlib.sha256((prefix + content).encode("utf-8")).digest()
|
|
generated_uuid = uuid.UUID(bytes=hashed_content[:16], version=4)
|
|
|
|
# Return the UUID according to the specified format.
|
|
if style == "simple":
|
|
return generated_uuid.hex
|
|
elif style == "hyphenated":
|
|
return str(generated_uuid)
|
|
elif style == "urn":
|
|
return f"urn:uuid:{generated_uuid}"
|
|
else:
|
|
raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
|
|
|
|
|
|
def workspace_filter_condition(workspace: str) -> models.FieldCondition:
|
|
"""
|
|
Create a workspace filter condition for Qdrant queries.
|
|
"""
|
|
return models.FieldCondition(
|
|
key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
|
|
)
|
|
|
|
|
|
def _find_legacy_collection(
|
|
client: QdrantClient,
|
|
namespace: str,
|
|
workspace: str = None,
|
|
model_suffix: str = None,
|
|
) -> str | None:
|
|
"""
|
|
Find legacy collection with backward compatibility support.
|
|
|
|
This function tries multiple naming patterns to locate legacy collections
|
|
created by older versions of LightRAG:
|
|
|
|
1. lightrag_vdb_{namespace} - if model_suffix is provided (HIGHEST PRIORITY)
|
|
2. {workspace}_{namespace} or {namespace} - no matter if model_suffix is provided or not
|
|
3. lightrag_vdb_{namespace} - fall back value no matter if model_suffix is provided or not (LOWEST PRIORITY)
|
|
|
|
Args:
|
|
client: QdrantClient instance
|
|
namespace: Base namespace (e.g., "chunks", "entities")
|
|
workspace: Optional workspace identifier
|
|
model_suffix: Optional model suffix for new collection
|
|
|
|
Returns:
|
|
Collection name if found, None otherwise
|
|
"""
|
|
# Try multiple naming patterns for backward compatibility
|
|
# More specific names (with workspace) have higher priority
|
|
candidates = [
|
|
f"lightrag_vdb_{namespace}" if model_suffix else None,
|
|
f"{workspace}_{namespace}" if workspace else None,
|
|
f"lightrag_vdb_{namespace}",
|
|
namespace,
|
|
]
|
|
|
|
for candidate in candidates:
|
|
if candidate and client.collection_exists(candidate):
|
|
logger.info(
|
|
f"Qdrant: Found legacy collection '{candidate}' "
|
|
f"(namespace={namespace}, workspace={workspace or 'none'})"
|
|
)
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
@final
|
|
@dataclass
|
|
class QdrantVectorDBStorage(BaseVectorStorage):
|
|
def __init__(
|
|
self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
|
|
):
|
|
super().__init__(
|
|
namespace=namespace,
|
|
workspace=workspace or "",
|
|
global_config=global_config,
|
|
embedding_func=embedding_func,
|
|
meta_fields=meta_fields or set(),
|
|
)
|
|
self.__post_init__()
|
|
|
|
@staticmethod
|
|
def setup_collection(
|
|
client: QdrantClient,
|
|
collection_name: str,
|
|
namespace: str,
|
|
workspace: str,
|
|
vectors_config: models.VectorParams,
|
|
hnsw_config: models.HnswConfigDiff,
|
|
model_suffix: str,
|
|
):
|
|
"""
|
|
Setup Qdrant collection with migration support from legacy collections.
|
|
|
|
Ensure final collection has workspace isolation index.
|
|
Check vector dimension compatibility before new collection creation.
|
|
Drop legacy collection if it exists and is empty.
|
|
Only migrate data from legacy collection to new collection when new collection first created and legacy collection is not empty.
|
|
|
|
Args:
|
|
client: QdrantClient instance
|
|
collection_name: Name of the final collection
|
|
namespace: Base namespace (e.g., "chunks", "entities")
|
|
workspace: Workspace identifier for data isolation
|
|
vectors_config: Vector configuration parameters for the collection
|
|
hnsw_config: HNSW index configuration diff for the collection
|
|
"""
|
|
if not namespace or not workspace:
|
|
raise ValueError("namespace and workspace must be provided")
|
|
|
|
workspace_count_filter = models.Filter(
|
|
must=[workspace_filter_condition(workspace)]
|
|
)
|
|
|
|
new_collection_exists = client.collection_exists(collection_name)
|
|
legacy_collection = _find_legacy_collection(
|
|
client, namespace, workspace, model_suffix
|
|
)
|
|
|
|
# Case 1: Only new collection exists or new collection is the same as legacy collection
|
|
# No data migration needed, and ensuring index is created then return
|
|
if (new_collection_exists and not legacy_collection) or (
|
|
collection_name == legacy_collection
|
|
):
|
|
# create_payload_index return without error if index already exists
|
|
client.create_payload_index(
|
|
collection_name=collection_name,
|
|
field_name=WORKSPACE_ID_FIELD,
|
|
field_schema=models.KeywordIndexParams(
|
|
type=models.KeywordIndexType.KEYWORD,
|
|
is_tenant=True,
|
|
),
|
|
)
|
|
new_workspace_count = client.count(
|
|
collection_name=collection_name,
|
|
count_filter=workspace_count_filter,
|
|
exact=True,
|
|
).count
|
|
|
|
# Skip data migration if new collection already has workspace data
|
|
if new_workspace_count == 0 and not (collection_name == legacy_collection):
|
|
logger.warning(
|
|
f"Qdrant: workspace data in collection '{collection_name}' is empty. "
|
|
f"Ensure it is caused by new workspace setup and not an unexpected embedding model change."
|
|
)
|
|
|
|
return
|
|
|
|
legacy_count = None
|
|
if not new_collection_exists:
|
|
# Check vector dimension compatibility before creating new collection
|
|
if legacy_collection:
|
|
legacy_count = client.count(
|
|
collection_name=legacy_collection, exact=True
|
|
).count
|
|
if legacy_count > 0:
|
|
legacy_info = client.get_collection(legacy_collection)
|
|
legacy_dim = legacy_info.config.params.vectors.size
|
|
|
|
if vectors_config.size and legacy_dim != vectors_config.size:
|
|
logger.error(
|
|
f"Qdrant: Dimension mismatch detected! "
|
|
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
|
|
f"but new embedding model expects {vectors_config.size}d."
|
|
)
|
|
|
|
raise DataMigrationError(
|
|
f"Dimension mismatch between legacy collection '{legacy_collection}' "
|
|
f"and new collection. Expected {vectors_config.size}d but got {legacy_dim}d."
|
|
)
|
|
|
|
client.create_collection(
|
|
collection_name, vectors_config=vectors_config, hnsw_config=hnsw_config
|
|
)
|
|
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
|
|
if not legacy_collection:
|
|
logger.warning(
|
|
"Qdrant: Ensure this new collection creation is caused by new workspace setup and not an unexpected embedding model change."
|
|
)
|
|
|
|
# create_payload_index return without error if index already exists
|
|
client.create_payload_index(
|
|
collection_name=collection_name,
|
|
field_name=WORKSPACE_ID_FIELD,
|
|
field_schema=models.KeywordIndexParams(
|
|
type=models.KeywordIndexType.KEYWORD,
|
|
is_tenant=True,
|
|
),
|
|
)
|
|
|
|
# Case 2: Legacy collection exist
|
|
if legacy_collection:
|
|
# Only drop legacy collection if it's empty
|
|
if legacy_count is None:
|
|
legacy_count = client.count(
|
|
collection_name=legacy_collection, exact=True
|
|
).count
|
|
if legacy_count == 0:
|
|
client.delete_collection(collection_name=legacy_collection)
|
|
logger.info(
|
|
f"Qdrant: Empty legacy collection '{legacy_collection}' deleted successfully"
|
|
)
|
|
return
|
|
|
|
new_workspace_count = client.count(
|
|
collection_name=collection_name,
|
|
count_filter=workspace_count_filter,
|
|
exact=True,
|
|
).count
|
|
|
|
# Skip data migration if new collection already has workspace data
|
|
if new_workspace_count > 0:
|
|
logger.warning(
|
|
f"Qdrant: Both new and legacy collection have data. "
|
|
f"{legacy_count} records in {legacy_collection} require manual deletion after migration verification."
|
|
)
|
|
return
|
|
|
|
# Case 3: Only legacy exists - migrate data from legacy collection to new collection
|
|
# Check if legacy collection has workspace_id to determine migration strategy
|
|
# Note: payload_schema only reflects INDEXED fields, so we also sample
|
|
# actual payloads to detect unindexed workspace_id fields
|
|
legacy_info = client.get_collection(legacy_collection)
|
|
has_workspace_index = WORKSPACE_ID_FIELD in (
|
|
legacy_info.payload_schema or {}
|
|
)
|
|
|
|
# Detect workspace_id field presence by sampling payloads if not indexed
|
|
# This prevents cross-workspace data leakage when workspace_id exists but isn't indexed
|
|
has_workspace_field = has_workspace_index
|
|
if not has_workspace_index:
|
|
# Sample a small batch of points to check for workspace_id in payloads
|
|
# All points must have workspace_id if any point has it
|
|
sample_result = client.scroll(
|
|
collection_name=legacy_collection,
|
|
limit=10, # Small sample is sufficient for detection
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
)
|
|
sample_points, _ = sample_result
|
|
for point in sample_points:
|
|
if point.payload and WORKSPACE_ID_FIELD in point.payload:
|
|
has_workspace_field = True
|
|
logger.info(
|
|
f"Qdrant: Detected unindexed {WORKSPACE_ID_FIELD} field "
|
|
f"in legacy collection '{legacy_collection}' via payload sampling"
|
|
)
|
|
break
|
|
|
|
# Build workspace filter if legacy collection has workspace support
|
|
# This prevents cross-workspace data leakage during migration
|
|
legacy_scroll_filter = None
|
|
if has_workspace_field:
|
|
legacy_scroll_filter = models.Filter(
|
|
must=[workspace_filter_condition(workspace)]
|
|
)
|
|
# Recount with workspace filter for accurate migration tracking
|
|
legacy_count = client.count(
|
|
collection_name=legacy_collection,
|
|
count_filter=legacy_scroll_filter,
|
|
exact=True,
|
|
).count
|
|
logger.info(
|
|
f"Qdrant: Legacy collection has workspace support, "
|
|
f"filtering to {legacy_count} records for workspace '{workspace}'"
|
|
)
|
|
|
|
logger.info(
|
|
f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records to migrate."
|
|
)
|
|
logger.info(
|
|
f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
|
|
)
|
|
|
|
try:
|
|
# Batch migration (500 records per batch)
|
|
migrated_count = 0
|
|
offset = None
|
|
batch_size = 500
|
|
|
|
while True:
|
|
# Scroll through legacy data with optional workspace filter
|
|
result = client.scroll(
|
|
collection_name=legacy_collection,
|
|
scroll_filter=legacy_scroll_filter,
|
|
limit=batch_size,
|
|
offset=offset,
|
|
with_vectors=True,
|
|
with_payload=True,
|
|
)
|
|
points, next_offset = result
|
|
|
|
if not points:
|
|
break
|
|
|
|
# Transform points for new collection
|
|
new_points = []
|
|
for point in points:
|
|
# Set workspace_id in payload
|
|
new_payload = dict(point.payload or {})
|
|
new_payload[WORKSPACE_ID_FIELD] = workspace
|
|
|
|
# Create new point with workspace-prefixed ID
|
|
original_id = new_payload.get(ID_FIELD)
|
|
if original_id:
|
|
new_point_id = compute_mdhash_id_for_qdrant(
|
|
original_id, prefix=workspace
|
|
)
|
|
else:
|
|
# Fallback: use original point ID
|
|
new_point_id = str(point.id)
|
|
|
|
new_points.append(
|
|
models.PointStruct(
|
|
id=new_point_id,
|
|
vector=point.vector,
|
|
payload=new_payload,
|
|
)
|
|
)
|
|
|
|
# Upsert to new collection
|
|
client.upsert(
|
|
collection_name=collection_name, points=new_points, wait=True
|
|
)
|
|
|
|
migrated_count += len(points)
|
|
logger.info(
|
|
f"Qdrant: {migrated_count}/{legacy_count} records migrated"
|
|
)
|
|
|
|
# Check if we've reached the end
|
|
if next_offset is None:
|
|
break
|
|
offset = next_offset
|
|
|
|
new_count_after = client.count(
|
|
collection_name=collection_name,
|
|
count_filter=workspace_count_filter,
|
|
exact=True,
|
|
).count
|
|
inserted_count = new_count_after - new_workspace_count
|
|
if inserted_count != legacy_count:
|
|
error_msg = (
|
|
"Qdrant: Migration verification failed, expected "
|
|
f"{legacy_count} inserted records, got {inserted_count}."
|
|
)
|
|
logger.error(error_msg)
|
|
raise DataMigrationError(error_msg)
|
|
|
|
except DataMigrationError:
|
|
# Re-raise DataMigrationError as-is to preserve specific error messages
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Qdrant: Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}': {e}"
|
|
)
|
|
raise DataMigrationError(
|
|
f"Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
|
|
) from e
|
|
|
|
logger.info(
|
|
f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
|
|
)
|
|
logger.warning(
|
|
"Qdrant: Manual deletion is required after data migration verification."
|
|
)
|
|
|
|
def __post_init__(self):
|
|
self._validate_embedding_func()
|
|
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
|
|
# This allows administrators to force a specific workspace for all Qdrant storage instances
|
|
qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
|
|
if qdrant_workspace and qdrant_workspace.strip():
|
|
# Use environment variable value, overriding the passed workspace parameter
|
|
effective_workspace = qdrant_workspace.strip()
|
|
logger.info(
|
|
f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding '{self.workspace}/{self.namespace}')"
|
|
)
|
|
else:
|
|
# Use the workspace parameter passed during initialization
|
|
effective_workspace = self.workspace
|
|
if effective_workspace:
|
|
logger.debug(
|
|
f"Using passed workspace parameter: '{effective_workspace}'"
|
|
)
|
|
|
|
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
|
|
|
|
# Generate model suffix
|
|
self.model_suffix = self._generate_collection_suffix()
|
|
|
|
# New naming scheme with model isolation
|
|
# Example: "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
|
|
# Ensure model_suffix is not empty before appending
|
|
if self.model_suffix:
|
|
self.final_namespace = f"lightrag_vdb_{self.namespace}_{self.model_suffix}"
|
|
logger.info(f"Qdrant collection: {self.final_namespace}")
|
|
else:
|
|
# Fallback: use legacy namespace if model_suffix is unavailable
|
|
self.final_namespace = f"lightrag_vdb_{self.namespace}"
|
|
logger.warning(
|
|
f"Qdrant collection: {self.final_namespace} missing suffix. Pls add model_name to embedding_func for proper workspace data isolation."
|
|
)
|
|
|
|
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
|
|
cosine_threshold = kwargs.get("cosine_better_than_threshold")
|
|
if cosine_threshold is None:
|
|
raise ValueError(
|
|
"cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
|
|
)
|
|
self.cosine_better_than_threshold = cosine_threshold
|
|
|
|
# Initialize client as None - will be created in initialize() method
|
|
self._client = None
|
|
self._max_batch_size = self.global_config["embedding_batch_num"]
|
|
self._max_upsert_payload_bytes = int(
|
|
os.getenv(
|
|
"QDRANT_UPSERT_MAX_PAYLOAD_BYTES",
|
|
str(DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES),
|
|
)
|
|
)
|
|
self._max_upsert_points_per_batch = int(
|
|
os.getenv(
|
|
"QDRANT_UPSERT_MAX_POINTS_PER_BATCH",
|
|
str(DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH),
|
|
)
|
|
)
|
|
if self._max_upsert_payload_bytes <= 0:
|
|
logger.warning(
|
|
f"QDRANT_UPSERT_MAX_PAYLOAD_BYTES={self._max_upsert_payload_bytes} is non-positive, disable payload-size splitting"
|
|
)
|
|
if self._max_upsert_points_per_batch <= 0:
|
|
logger.warning(
|
|
f"QDRANT_UPSERT_MAX_POINTS_PER_BATCH={self._max_upsert_points_per_batch} is non-positive, disable point-count splitting"
|
|
)
|
|
self._initialized = False
|
|
|
|
@staticmethod
|
|
def _to_json_serializable(value: Any) -> Any:
|
|
"""Convert nested values to JSON-serializable types for payload size estimation."""
|
|
if isinstance(value, np.ndarray):
|
|
return value.tolist()
|
|
if isinstance(value, np.integer):
|
|
return int(value)
|
|
if isinstance(value, np.floating):
|
|
return float(value)
|
|
if isinstance(value, dict):
|
|
return {
|
|
str(k): QdrantVectorDBStorage._to_json_serializable(v)
|
|
for k, v in value.items()
|
|
}
|
|
if isinstance(value, (list, tuple)):
|
|
return [QdrantVectorDBStorage._to_json_serializable(v) for v in value]
|
|
return value
|
|
|
|
@staticmethod
|
|
def _estimate_point_payload_bytes(point: models.PointStruct) -> int:
|
|
"""Estimate serialized JSON byte size of a single Qdrant point."""
|
|
point_obj = {
|
|
"id": point.id,
|
|
"vector": QdrantVectorDBStorage._to_json_serializable(point.vector),
|
|
"payload": QdrantVectorDBStorage._to_json_serializable(point.payload or {}),
|
|
}
|
|
return len(
|
|
json.dumps(
|
|
point_obj,
|
|
ensure_ascii=False,
|
|
separators=(",", ":"),
|
|
).encode("utf-8")
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_upsert_batches(
|
|
points: list[models.PointStruct],
|
|
max_payload_bytes: int,
|
|
max_points_per_batch: int,
|
|
) -> list[tuple[list[models.PointStruct], int]]:
|
|
"""Split points into batches using payload size and point count limits."""
|
|
if not points:
|
|
return []
|
|
|
|
payload_limit = max_payload_bytes if max_payload_bytes > 0 else float("inf")
|
|
points_limit = (
|
|
max_points_per_batch if max_points_per_batch > 0 else float("inf")
|
|
)
|
|
|
|
batches: list[tuple[list[models.PointStruct], int]] = []
|
|
current_batch: list[models.PointStruct] = []
|
|
# JSON array overhead ("[]")
|
|
current_estimated_bytes = 2
|
|
|
|
for point in points:
|
|
point_size = QdrantVectorDBStorage._estimate_point_payload_bytes(point)
|
|
point_with_array_overhead = point_size + 2
|
|
point_id = str(point.id)
|
|
|
|
if point_with_array_overhead > payload_limit:
|
|
raise ValueError(
|
|
f"Single Qdrant point exceeds payload limit: id={point_id}, "
|
|
f"estimated_bytes={point_with_array_overhead}, "
|
|
f"limit={int(payload_limit)}"
|
|
)
|
|
|
|
# If current batch not empty, a comma is needed before next element.
|
|
separator_overhead = 1 if current_batch else 0
|
|
next_batch_size = current_estimated_bytes + separator_overhead + point_size
|
|
|
|
if current_batch and (
|
|
len(current_batch) >= points_limit or next_batch_size > payload_limit
|
|
):
|
|
batches.append((current_batch, current_estimated_bytes))
|
|
current_batch = []
|
|
current_estimated_bytes = 2
|
|
next_batch_size = current_estimated_bytes + point_size
|
|
|
|
current_batch.append(point)
|
|
current_estimated_bytes = next_batch_size
|
|
|
|
if current_batch:
|
|
batches.append((current_batch, current_estimated_bytes))
|
|
|
|
return batches
|
|
|
|
async def initialize(self):
|
|
"""Initialize Qdrant collection"""
|
|
async with get_data_init_lock():
|
|
if self._initialized:
|
|
return
|
|
|
|
try:
|
|
# Create QdrantClient if not already created
|
|
if self._client is None:
|
|
self._client = QdrantClient(
|
|
url=os.environ.get(
|
|
"QDRANT_URL", config.get("qdrant", "uri", fallback=None)
|
|
),
|
|
api_key=os.environ.get(
|
|
"QDRANT_API_KEY",
|
|
config.get("qdrant", "apikey", fallback=None),
|
|
),
|
|
)
|
|
logger.debug(
|
|
f"[{self.workspace}] QdrantClient created successfully"
|
|
)
|
|
|
|
# Setup collection (create if not exists and configure indexes)
|
|
# Pass namespace and workspace for backward-compatible migration support
|
|
QdrantVectorDBStorage.setup_collection(
|
|
self._client,
|
|
self.final_namespace,
|
|
namespace=self.namespace,
|
|
workspace=self.effective_workspace,
|
|
vectors_config=models.VectorParams(
|
|
size=self.embedding_func.embedding_dim,
|
|
distance=models.Distance.COSINE,
|
|
),
|
|
hnsw_config=models.HnswConfigDiff(
|
|
payload_m=16,
|
|
m=0,
|
|
),
|
|
model_suffix=self.model_suffix,
|
|
)
|
|
|
|
# Removed duplicate max batch size initialization
|
|
|
|
self._initialized = True
|
|
logger.info(
|
|
f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Failed to initialize Qdrant collection '{self.namespace}': {e}"
|
|
)
|
|
raise
|
|
|
|
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
|
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
|
|
if not data:
|
|
return
|
|
|
|
import time
|
|
|
|
current_time = int(time.time())
|
|
|
|
list_data = [
|
|
{
|
|
ID_FIELD: k,
|
|
WORKSPACE_ID_FIELD: self.effective_workspace,
|
|
CREATED_AT_FIELD: current_time,
|
|
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
|
|
}
|
|
for k, v in data.items()
|
|
]
|
|
contents = [v["content"] for v in data.values()]
|
|
batches = [
|
|
contents[i : i + self._max_batch_size]
|
|
for i in range(0, len(contents), self._max_batch_size)
|
|
]
|
|
|
|
embedding_tasks = [
|
|
self.embedding_func(batch, context="document") for batch in batches
|
|
]
|
|
embeddings_list = await asyncio.gather(*embedding_tasks)
|
|
|
|
embeddings = np.concatenate(embeddings_list)
|
|
|
|
list_points = []
|
|
for i, d in enumerate(list_data):
|
|
list_points.append(
|
|
models.PointStruct(
|
|
id=compute_mdhash_id_for_qdrant(
|
|
d[ID_FIELD], prefix=self.effective_workspace
|
|
),
|
|
vector=embeddings[i],
|
|
payload=d,
|
|
)
|
|
)
|
|
|
|
point_batches = self._build_upsert_batches(
|
|
list_points,
|
|
max_payload_bytes=self._max_upsert_payload_bytes,
|
|
max_points_per_batch=self._max_upsert_points_per_batch,
|
|
)
|
|
|
|
if len(point_batches) > 1:
|
|
logger.info(
|
|
f"[{self.workspace}] Qdrant upsert split into {len(point_batches)} batches "
|
|
f"for {len(list_points)} points (max_payload_bytes={self._max_upsert_payload_bytes}, "
|
|
f"max_points_per_batch={self._max_upsert_points_per_batch})"
|
|
)
|
|
|
|
results = None
|
|
for batch_index, (points_batch, estimated_bytes) in enumerate(point_batches, 1):
|
|
logger.debug(
|
|
f"[{self.workspace}] Qdrant upsert batch {batch_index}/{len(point_batches)}: "
|
|
f"points={len(points_batch)}, estimated_payload_bytes={estimated_bytes}"
|
|
)
|
|
# Fail-fast: any batch failure raises immediately and stops subsequent batches.
|
|
results = self._client.upsert(
|
|
collection_name=self.final_namespace,
|
|
points=points_batch,
|
|
wait=True,
|
|
)
|
|
|
|
return results
|
|
|
|
async def query(
|
|
self, query: str, top_k: int, query_embedding: list[float] = None
|
|
) -> list[dict[str, Any]]:
|
|
if query_embedding is not None:
|
|
embedding = query_embedding
|
|
else:
|
|
embedding_result = await self.embedding_func(
|
|
[query], context="query", _priority=5
|
|
) # higher priority for query
|
|
embedding = embedding_result[0]
|
|
|
|
results = self._client.query_points(
|
|
collection_name=self.final_namespace,
|
|
query=embedding,
|
|
limit=top_k,
|
|
with_payload=True,
|
|
score_threshold=self.cosine_better_than_threshold,
|
|
query_filter=models.Filter(
|
|
must=[workspace_filter_condition(self.effective_workspace)]
|
|
),
|
|
).points
|
|
|
|
return [
|
|
{
|
|
**dp.payload,
|
|
"distance": dp.score,
|
|
CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
|
|
}
|
|
for dp in results
|
|
]
|
|
|
|
async def index_done_callback(self) -> None:
|
|
# Qdrant handles persistence automatically
|
|
pass
|
|
|
|
async def delete(self, ids: List[str]) -> None:
|
|
"""Delete vectors with specified IDs
|
|
|
|
Args:
|
|
ids: List of vector IDs to be deleted
|
|
"""
|
|
try:
|
|
if not ids:
|
|
return
|
|
|
|
# Convert regular ids to Qdrant compatible ids
|
|
qdrant_ids = [
|
|
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
|
for id in ids
|
|
]
|
|
# Delete points from the collection with workspace filtering
|
|
self._client.delete(
|
|
collection_name=self.final_namespace,
|
|
points_selector=models.PointIdsList(points=qdrant_ids),
|
|
wait=True,
|
|
)
|
|
logger.debug(
|
|
f"[{self.workspace}] Successfully deleted {len(ids)} vectors from {self.namespace}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error while deleting vectors from {self.namespace}: {e}"
|
|
)
|
|
|
|
async def delete_entity(self, entity_name: str) -> None:
|
|
"""Delete an entity by name
|
|
|
|
Args:
|
|
entity_name: Name of the entity to delete
|
|
"""
|
|
try:
|
|
# Compute entity ID from name (same as Milvus)
|
|
entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
|
|
logger.debug(
|
|
f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}"
|
|
)
|
|
|
|
# Scroll to find the entity by its ID field in payload with workspace filtering
|
|
# This is safer than reconstructing the Qdrant point ID
|
|
results = self._client.scroll(
|
|
collection_name=self.final_namespace,
|
|
scroll_filter=models.Filter(
|
|
must=[
|
|
workspace_filter_condition(self.effective_workspace),
|
|
models.FieldCondition(
|
|
key=ID_FIELD, match=models.MatchValue(value=entity_id)
|
|
),
|
|
]
|
|
),
|
|
with_payload=False,
|
|
limit=1,
|
|
)
|
|
|
|
# Extract point IDs to delete
|
|
points = results[0]
|
|
if points:
|
|
ids_to_delete = [point.id for point in points]
|
|
self._client.delete(
|
|
collection_name=self.final_namespace,
|
|
points_selector=models.PointIdsList(points=ids_to_delete),
|
|
wait=True,
|
|
)
|
|
logger.debug(
|
|
f"[{self.workspace}] Successfully deleted entity {entity_name}"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"[{self.workspace}] Entity {entity_name} not found in storage"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[{self.workspace}] Error deleting entity {entity_name}: {e}")
|
|
|
|
async def delete_entity_relation(self, entity_name: str) -> None:
|
|
"""Delete all relations associated with an entity
|
|
|
|
Args:
|
|
entity_name: Name of the entity whose relations should be deleted
|
|
"""
|
|
try:
|
|
# Build the filter to find relations where entity is either source or target
|
|
# must + should = workspace_id matches AND (src_id matches OR tgt_id matches)
|
|
relation_filter = models.Filter(
|
|
must=[workspace_filter_condition(self.effective_workspace)],
|
|
should=[
|
|
models.FieldCondition(
|
|
key="src_id", match=models.MatchValue(value=entity_name)
|
|
),
|
|
models.FieldCondition(
|
|
key="tgt_id", match=models.MatchValue(value=entity_name)
|
|
),
|
|
],
|
|
)
|
|
|
|
# Paginate through all matching relations to handle large datasets
|
|
total_deleted = 0
|
|
offset = None
|
|
batch_size = 1000
|
|
|
|
while True:
|
|
# Scroll to find relations, using with_payload=False for efficiency
|
|
# since we only need point IDs for deletion
|
|
results = self._client.scroll(
|
|
collection_name=self.final_namespace,
|
|
scroll_filter=relation_filter,
|
|
with_payload=False,
|
|
with_vectors=False,
|
|
limit=batch_size,
|
|
offset=offset,
|
|
)
|
|
|
|
points, next_offset = results
|
|
if not points:
|
|
break
|
|
|
|
# Extract point IDs to delete
|
|
ids_to_delete = [point.id for point in points]
|
|
|
|
# Delete the batch of relations
|
|
self._client.delete(
|
|
collection_name=self.final_namespace,
|
|
points_selector=models.PointIdsList(points=ids_to_delete),
|
|
wait=True,
|
|
)
|
|
total_deleted += len(ids_to_delete)
|
|
|
|
# Check if we've reached the end
|
|
if next_offset is None:
|
|
break
|
|
offset = next_offset
|
|
|
|
if total_deleted > 0:
|
|
logger.debug(
|
|
f"[{self.workspace}] Deleted {total_deleted} relations for {entity_name}"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"[{self.workspace}] No relations found for entity {entity_name}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error deleting relations for {entity_name}: {e}"
|
|
)
|
|
|
|
async def get_by_id(self, id: str) -> dict[str, Any] | None:
|
|
"""Get vector data by its ID
|
|
|
|
Args:
|
|
id: The unique identifier of the vector
|
|
|
|
Returns:
|
|
The vector data if found, or None if not found
|
|
"""
|
|
try:
|
|
# Convert to Qdrant compatible ID
|
|
qdrant_id = compute_mdhash_id_for_qdrant(
|
|
id, prefix=self.effective_workspace
|
|
)
|
|
|
|
# Retrieve the point by ID with workspace filtering
|
|
result = self._client.retrieve(
|
|
collection_name=self.final_namespace,
|
|
ids=[qdrant_id],
|
|
with_payload=True,
|
|
)
|
|
|
|
if not result:
|
|
return None
|
|
|
|
payload = result[0].payload
|
|
if CREATED_AT_FIELD not in payload:
|
|
payload[CREATED_AT_FIELD] = None
|
|
|
|
return payload
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error retrieving vector data for ID {id}: {e}"
|
|
)
|
|
return None
|
|
|
|
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
|
|
"""Get multiple vector data by their IDs
|
|
|
|
Args:
|
|
ids: List of unique identifiers
|
|
|
|
Returns:
|
|
List of vector data objects that were found
|
|
"""
|
|
if not ids:
|
|
return []
|
|
|
|
try:
|
|
# Convert to Qdrant compatible IDs
|
|
qdrant_ids = [
|
|
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
|
for id in ids
|
|
]
|
|
|
|
# Retrieve the points by IDs
|
|
results = self._client.retrieve(
|
|
collection_name=self.final_namespace,
|
|
ids=qdrant_ids,
|
|
with_payload=True,
|
|
)
|
|
|
|
# Ensure each result contains created_at field and preserve caller ordering
|
|
payload_by_original_id: dict[str, dict[str, Any]] = {}
|
|
payload_by_qdrant_id: dict[str, dict[str, Any]] = {}
|
|
|
|
for point in results:
|
|
payload = dict(point.payload or {})
|
|
if CREATED_AT_FIELD not in payload:
|
|
payload[CREATED_AT_FIELD] = None
|
|
|
|
qdrant_point_id = str(point.id) if point.id is not None else ""
|
|
if qdrant_point_id:
|
|
payload_by_qdrant_id[qdrant_point_id] = payload
|
|
|
|
original_id = payload.get(ID_FIELD)
|
|
if original_id is not None:
|
|
payload_by_original_id[str(original_id)] = payload
|
|
|
|
ordered_payloads: list[dict[str, Any] | None] = []
|
|
for requested_id, qdrant_id in zip(ids, qdrant_ids):
|
|
payload = payload_by_original_id.get(str(requested_id))
|
|
if payload is None:
|
|
payload = payload_by_qdrant_id.get(str(qdrant_id))
|
|
ordered_payloads.append(payload)
|
|
|
|
return ordered_payloads
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}"
|
|
)
|
|
return []
|
|
|
|
async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
|
|
"""Get vectors by their IDs, returning only ID and vector data for efficiency
|
|
|
|
Args:
|
|
ids: List of unique identifiers
|
|
|
|
Returns:
|
|
Dictionary mapping IDs to their vector embeddings
|
|
Format: {id: [vector_values], ...}
|
|
"""
|
|
if not ids:
|
|
return {}
|
|
|
|
try:
|
|
# Convert to Qdrant compatible IDs
|
|
qdrant_ids = [
|
|
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
|
for id in ids
|
|
]
|
|
|
|
# Retrieve the points by IDs with vectors
|
|
results = self._client.retrieve(
|
|
collection_name=self.final_namespace,
|
|
ids=qdrant_ids,
|
|
with_vectors=True, # Important: request vectors
|
|
with_payload=True,
|
|
)
|
|
|
|
vectors_dict = {}
|
|
for point in results:
|
|
if point and point.vector is not None and point.payload:
|
|
# Get original ID from payload
|
|
original_id = point.payload.get(ID_FIELD)
|
|
if original_id:
|
|
# Convert numpy array to list if needed
|
|
vector_data = point.vector
|
|
if isinstance(vector_data, np.ndarray):
|
|
vector_data = vector_data.tolist()
|
|
vectors_dict[original_id] = vector_data
|
|
|
|
return vectors_dict
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error retrieving vectors by IDs from {self.namespace}: {e}"
|
|
)
|
|
return {}
|
|
|
|
async def drop(self) -> dict[str, str]:
|
|
"""Drop all vector data from storage and clean up resources
|
|
|
|
This method will delete all data for the current workspace from the Qdrant collection.
|
|
|
|
Returns:
|
|
dict[str, str]: Operation status and message
|
|
- On success: {"status": "success", "message": "data dropped"}
|
|
- On failure: {"status": "error", "message": "<error details>"}
|
|
"""
|
|
# No need to lock: data integrity is ensured by allowing only one process to hold pipeline at a time
|
|
try:
|
|
# Delete all points for the current workspace
|
|
self._client.delete(
|
|
collection_name=self.final_namespace,
|
|
points_selector=models.FilterSelector(
|
|
filter=models.Filter(
|
|
must=[workspace_filter_condition(self.effective_workspace)]
|
|
)
|
|
),
|
|
wait=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
|
|
)
|
|
return {"status": "success", "message": "data dropped"}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
|
|
)
|
|
return {"status": "error", "message": str(e)}
|