系统正在读取当前记录的结构化信息。
-diff --git a/.tmp/lightrag_inspect/lightrag_hku-1.4.16-py3-none-any.whl b/.tmp/lightrag_inspect/lightrag_hku-1.4.16-py3-none-any.whl
new file mode 100644
index 0000000..6507fe1
Binary files /dev/null and b/.tmp/lightrag_inspect/lightrag_hku-1.4.16-py3-none-any.whl differ
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/__init__.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/__init__.py
new file mode 100644
index 0000000..e269f25
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/__init__.py
@@ -0,0 +1,22 @@
+from typing import TYPE_CHECKING, Any
+
+from ._version import __version__ as __version__
+
+__all__ = ["LightRAG", "QueryParam", "__version__"]
+
+if TYPE_CHECKING:
+ from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
+
+
+def __getattr__(name: str) -> Any:
+ if name in {"LightRAG", "QueryParam"}:
+ from .lightrag import LightRAG, QueryParam
+
+ value = {"LightRAG": LightRAG, "QueryParam": QueryParam}[name]
+ globals()[name] = value
+ return value
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+
+
+__author__ = "Zirui Guo"
+__url__ = "https://github.com/HKUDS/LightRAG"
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/_version.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/_version.py
new file mode 100644
index 0000000..0ccdc52
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/_version.py
@@ -0,0 +1,4 @@
+"""Lightweight version definitions shared by packaging and runtime code."""
+
+__version__ = "1.4.16"
+__api_version__ = "0291"
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/__init__.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/__init__.py
new file mode 100644
index 0000000..86ac14b
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/__init__.py
@@ -0,0 +1 @@
+from .._version import __api_version__ as __api_version__
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/auth.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/auth.py
new file mode 100644
index 0000000..04e7f3c
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/auth.py
@@ -0,0 +1,163 @@
+from datetime import datetime, timedelta, timezone
+
+import jwt
+from dotenv import load_dotenv
+from fastapi import HTTPException, status
+from pydantic import BaseModel
+
+from ..utils import logger
+from .config import DEFAULT_TOKEN_SECRET, global_args
+from .passwords import verify_password
+
+# use the .env that is inside the current folder
+# allows to use different .env file for each lightrag instance
+# the OS environment variables take precedence over the .env file
+load_dotenv(dotenv_path=".env", override=False)
+
+
+class TokenPayload(BaseModel):
+ sub: str # Username
+ exp: datetime # Expiration time
+ role: str = "user" # User role, default is regular user
+ metadata: dict = {} # Additional metadata
+
+
+class AuthHandler:
+ def __init__(self):
+ auth_accounts = global_args.auth_accounts
+ self.secret = global_args.token_secret
+ if not self.secret:
+ if auth_accounts:
+ raise ValueError(
+ "TOKEN_SECRET must be explicitly set to a non-default value when AUTH_ACCOUNTS is configured."
+ )
+ self.secret = DEFAULT_TOKEN_SECRET
+ logger.warning(
+ "TOKEN_SECRET not set and AUTH_ACCOUNTS is not configured. "
+ "Falling back to the default guest-mode JWT secret. "
+ )
+ algorithm = global_args.jwt_algorithm
+ if not algorithm or algorithm.lower() == "none":
+ raise ValueError(
+ "JWT_ALGORITHM must be set to a secure algorithm (e.g. HS256). "
+ "The 'none' algorithm is not permitted."
+ )
+ self.algorithm = algorithm
+ self.expire_hours = global_args.token_expire_hours
+ self.guest_expire_hours = global_args.guest_token_expire_hours
+ self.accounts = {}
+ invalid_accounts = []
+ if auth_accounts:
+ for account in auth_accounts.split(","):
+ try:
+ username, password = account.split(":", 1)
+ if not username or not password:
+ raise ValueError
+ self.accounts[username] = password
+ except ValueError:
+ invalid_accounts.append(account)
+ if invalid_accounts:
+ invalid_entries = ", ".join(invalid_accounts)
+ logger.error(f"Invalid account format in AUTH_ACCOUNTS: {invalid_entries}")
+ raise ValueError(
+ "AUTH_ACCOUNTS must use comma-separated user:password pairs."
+ )
+
+ def verify_password(self, username: str, plain_password: str) -> bool:
+ """
+ Verify password for a user. Supports explicit bcrypt values and plaintext.
+
+ Args:
+ username: Username to verify
+ plain_password: Plaintext password to check
+
+ Returns:
+ bool: True if password is correct, False otherwise
+ """
+ if username not in self.accounts:
+ return False
+
+ stored_password = self.accounts[username]
+ return verify_password(plain_password, stored_password)
+
+ def create_token(
+ self,
+ username: str,
+ role: str = "user",
+ custom_expire_hours: int = None,
+ metadata: dict = None,
+ ) -> str:
+ """
+ Create JWT token
+
+ Args:
+ username: Username
+ role: User role, default is "user", guest is "guest"
+ custom_expire_hours: Custom expiration time (hours), if None use default value
+ metadata: Additional metadata
+
+ Returns:
+ str: Encoded JWT token
+ """
+ # Choose default expiration time based on role
+ if custom_expire_hours is None:
+ if role == "guest":
+ expire_hours = self.guest_expire_hours
+ else:
+ expire_hours = self.expire_hours
+ else:
+ expire_hours = custom_expire_hours
+
+ expire = datetime.now(timezone.utc) + timedelta(hours=expire_hours)
+
+ # Create payload
+ payload = TokenPayload(
+ sub=username, exp=expire, role=role, metadata=metadata or {}
+ )
+
+ return jwt.encode(payload.model_dump(), self.secret, algorithm=self.algorithm)
+
+ def validate_token(self, token: str) -> dict:
+ """
+ Validate JWT token
+
+ Args:
+ token: JWT token
+
+ Returns:
+ dict: Dictionary containing user information
+
+ Raises:
+ HTTPException: If token is invalid or expired
+ """
+ try:
+ # Explicitly exclude 'none' to prevent algorithm confusion attacks
+ allowed_algorithms = [self.algorithm]
+ if "none" in (a.lower() for a in allowed_algorithms):
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Insecure JWT algorithm configuration",
+ )
+ payload = jwt.decode(token, self.secret, algorithms=allowed_algorithms)
+ expire_timestamp = payload["exp"]
+ expire_time = datetime.fromtimestamp(expire_timestamp, timezone.utc)
+
+ if datetime.now(timezone.utc) > expire_time:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
+ )
+
+ # Return complete payload instead of just username
+ return {
+ "username": payload["sub"],
+ "role": payload.get("role", "user"),
+ "metadata": payload.get("metadata", {}),
+ "exp": expire_time,
+ }
+ except jwt.PyJWTError:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
+ )
+
+
+auth_handler = AuthHandler()
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/config.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/config.py
new file mode 100644
index 0000000..981653c
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/config.py
@@ -0,0 +1,697 @@
+"""
+Configs for the LightRAG API.
+"""
+
+import os
+import re
+import argparse
+import logging
+from dotenv import load_dotenv
+from lightrag.utils import get_env_value, logger
+from lightrag.llm.binding_options import (
+ GeminiEmbeddingOptions,
+ GeminiLLMOptions,
+ OllamaEmbeddingOptions,
+ OllamaLLMOptions,
+ OpenAILLMOptions,
+)
+from lightrag.base import OllamaServerInfos
+import sys
+
+from lightrag.constants import (
+ DEFAULT_WOKERS,
+ DEFAULT_TIMEOUT,
+ DEFAULT_TOP_K,
+ DEFAULT_CHUNK_TOP_K,
+ DEFAULT_HISTORY_TURNS,
+ DEFAULT_MAX_ENTITY_TOKENS,
+ DEFAULT_MAX_RELATION_TOKENS,
+ DEFAULT_MAX_TOTAL_TOKENS,
+ DEFAULT_COSINE_THRESHOLD,
+ DEFAULT_RELATED_CHUNK_NUMBER,
+ DEFAULT_MIN_RERANK_SCORE,
+ DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
+ DEFAULT_MAX_ASYNC,
+ DEFAULT_SUMMARY_MAX_TOKENS,
+ DEFAULT_SUMMARY_LENGTH_RECOMMENDED,
+ DEFAULT_SUMMARY_CONTEXT_SIZE,
+ DEFAULT_SUMMARY_LANGUAGE,
+ DEFAULT_EMBEDDING_FUNC_MAX_ASYNC,
+ DEFAULT_EMBEDDING_BATCH_NUM,
+ DEFAULT_OLLAMA_MODEL_NAME,
+ DEFAULT_OLLAMA_MODEL_TAG,
+ DEFAULT_RERANK_BINDING,
+ DEFAULT_ENTITY_TYPES,
+)
+
+# use the .env that is inside the current folder
+# allows to use different .env file for each lightrag instance
+# the OS environment variables take precedence over the .env file
+load_dotenv(dotenv_path=".env", override=False)
+
+
+ollama_server_infos = OllamaServerInfos()
+DEFAULT_TOKEN_SECRET = "lightrag-jwt-default-secret-key!"
+NO_PREFIX_SENTINEL = "NO_PREFIX"
+PROVIDER_ASYMMETRIC_EMBEDDING_BINDINGS = {"gemini", "jina", "voyageai"}
+PREFIX_ASYMMETRIC_EMBEDDING_BINDINGS = {"azure_openai", "ollama", "openai"}
+
+
+class DefaultRAGStorageConfig:
+ KV_STORAGE = "JsonKVStorage"
+ VECTOR_STORAGE = "NanoVectorDBStorage"
+ GRAPH_STORAGE = "NetworkXStorage"
+ DOC_STATUS_STORAGE = "JsonDocStatusStorage"
+
+
+def get_default_host(binding_type: str) -> str:
+ default_hosts = {
+ "ollama": os.getenv("LLM_BINDING_HOST", "http://localhost:11434"),
+ "lollms": os.getenv("LLM_BINDING_HOST", "http://localhost:9600"),
+ "azure_openai": os.getenv("AZURE_OPENAI_ENDPOINT", "https://api.openai.com/v1"),
+ "openai": os.getenv("LLM_BINDING_HOST", "https://api.openai.com/v1"),
+ "gemini": os.getenv(
+ "LLM_BINDING_HOST", "https://generativelanguage.googleapis.com"
+ ),
+ }
+ return default_hosts.get(
+ binding_type, os.getenv("LLM_BINDING_HOST", "http://localhost:11434")
+ ) # fallback to ollama if unknown
+
+
+def resolve_asymmetric_embedding_opt_in(
+ *,
+ binding: str,
+ embedding_asymmetric: bool,
+ embedding_asymmetric_configured: bool,
+ query_prefix: str | None,
+ document_prefix: str | None,
+ query_prefix_configured: bool = False,
+ document_prefix_configured: bool = False,
+) -> bool:
+ """Resolve whether query/document-aware embedding behavior should be enabled."""
+ has_non_empty_prefix = bool(query_prefix or document_prefix)
+ has_prefix_config = query_prefix_configured or document_prefix_configured
+
+ if not embedding_asymmetric:
+ if has_prefix_config:
+ state = "false" if embedding_asymmetric_configured else "unset"
+ logger.warning(
+ f"EMBEDDING_ASYMMETRIC is {state}; "
+ "EMBEDDING_QUERY_PREFIX and EMBEDDING_DOCUMENT_PREFIX will be ignored."
+ )
+ return False
+
+ if binding in PROVIDER_ASYMMETRIC_EMBEDDING_BINDINGS:
+ if has_prefix_config:
+ logger.warning(
+ f"{binding} embeddings use provider task parameters for asymmetric "
+ "mode; EMBEDDING_QUERY_PREFIX and EMBEDDING_DOCUMENT_PREFIX will be ignored."
+ )
+ return True
+
+ if binding in PREFIX_ASYMMETRIC_EMBEDDING_BINDINGS:
+ if not query_prefix_configured or not document_prefix_configured:
+ raise ValueError(
+ f"EMBEDDING_ASYMMETRIC=true for {binding} embeddings requires both "
+ "EMBEDDING_QUERY_PREFIX and EMBEDDING_DOCUMENT_PREFIX. Use "
+ f"{NO_PREFIX_SENTINEL} for a side that should intentionally have no prefix."
+ )
+
+ if not has_non_empty_prefix:
+ raise ValueError(
+ "At least one of EMBEDDING_QUERY_PREFIX or EMBEDDING_DOCUMENT_PREFIX "
+ f"must be non-empty. Use {NO_PREFIX_SENTINEL} only for the side that "
+ "should intentionally have no prefix."
+ )
+ return True
+
+ raise ValueError(
+ f"EMBEDDING_ASYMMETRIC=true is not supported for {binding} embeddings."
+ )
+
+
+def get_embedding_prefix_config(env_key: str) -> tuple[str | None, bool]:
+ """Read an embedding prefix and whether it was explicitly configured."""
+ if env_key not in os.environ:
+ return None, False
+
+ value = os.environ[env_key]
+ if value == "None":
+ return None, False
+ if value == NO_PREFIX_SENTINEL:
+ return "", True
+ if value == "":
+ raise ValueError(
+ f"{env_key} is empty. Use {NO_PREFIX_SENTINEL} to explicitly request "
+ "no prefix, or remove the variable to leave it unconfigured."
+ )
+ return value, True
+
+
+def validate_auth_configuration(args: argparse.Namespace) -> None:
+ """Reject insecure JWT auth settings before the API starts."""
+ auth_accounts = (getattr(args, "auth_accounts", "") or "").strip()
+ token_secret = (getattr(args, "token_secret", "") or "").strip()
+
+ if auth_accounts and (not token_secret or token_secret == DEFAULT_TOKEN_SECRET):
+ raise ValueError(
+ "TOKEN_SECRET must be explicitly set to a non-default value when AUTH_ACCOUNTS is configured."
+ )
+
+
+def parse_args() -> argparse.Namespace:
+ """
+ Parse command line arguments with environment variable fallback
+
+ Args:
+ is_uvicorn_mode: Whether running under uvicorn mode
+
+ Returns:
+ argparse.Namespace: Parsed arguments
+ """
+
+ parser = argparse.ArgumentParser(description="LightRAG API Server")
+
+ # Server configuration
+ parser.add_argument(
+ "--host",
+ default=get_env_value("HOST", "0.0.0.0"),
+ help="Server host (default: from env or 0.0.0.0)",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=get_env_value("PORT", 9621, int),
+ help="Server port (default: from env or 9621)",
+ )
+
+ # Directory configuration
+ parser.add_argument(
+ "--working-dir",
+ default=get_env_value("WORKING_DIR", "./rag_storage"),
+ help="Working directory for RAG storage (default: from env or ./rag_storage)",
+ )
+ parser.add_argument(
+ "--input-dir",
+ default=get_env_value("INPUT_DIR", "./inputs"),
+ help="Directory containing input documents (default: from env or ./inputs)",
+ )
+
+ parser.add_argument(
+ "--timeout",
+ default=get_env_value("TIMEOUT", DEFAULT_TIMEOUT, int, special_none=True),
+ type=int,
+ help="Timeout in seconds (useful when using slow AI). Use None for infinite timeout",
+ )
+
+ # RAG configuration
+ parser.add_argument(
+ "--max-async",
+ type=int,
+ default=get_env_value("MAX_ASYNC", DEFAULT_MAX_ASYNC, int),
+ help=f"Maximum async operations (default: from env or {DEFAULT_MAX_ASYNC})",
+ )
+ parser.add_argument(
+ "--summary-max-tokens",
+ type=int,
+ default=get_env_value("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS, int),
+ help=f"Maximum token size for entity/relation summary(default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})",
+ )
+ parser.add_argument(
+ "--summary-context-size",
+ type=int,
+ default=get_env_value(
+ "SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE, int
+ ),
+ help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})",
+ )
+ parser.add_argument(
+ "--summary-length-recommended",
+ type=int,
+ default=get_env_value(
+ "SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED, int
+ ),
+ help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_LENGTH_RECOMMENDED})",
+ )
+
+ # Logging configuration
+ parser.add_argument(
+ "--log-level",
+ default=get_env_value("LOG_LEVEL", "INFO"),
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+ help="Logging level (default: from env or INFO)",
+ )
+ parser.add_argument(
+ "--verbose",
+ action="store_true",
+ default=get_env_value("VERBOSE", False, bool),
+ help="Enable verbose debug output(only valid for DEBUG log-level)",
+ )
+
+ parser.add_argument(
+ "--key",
+ type=str,
+ default=get_env_value("LIGHTRAG_API_KEY", None),
+ help="API key for authentication. This protects lightrag server against unauthorized access",
+ )
+
+ # Optional https parameters
+ parser.add_argument(
+ "--ssl",
+ action="store_true",
+ default=get_env_value("SSL", False, bool),
+ help="Enable HTTPS (default: from env or False)",
+ )
+ parser.add_argument(
+ "--ssl-certfile",
+ default=get_env_value("SSL_CERTFILE", None),
+ help="Path to SSL certificate file (required if --ssl is enabled)",
+ )
+ parser.add_argument(
+ "--ssl-keyfile",
+ default=get_env_value("SSL_KEYFILE", None),
+ help="Path to SSL private key file (required if --ssl is enabled)",
+ )
+
+ # Ollama model configuration
+ parser.add_argument(
+ "--simulated-model-name",
+ type=str,
+ default=get_env_value("OLLAMA_EMULATING_MODEL_NAME", DEFAULT_OLLAMA_MODEL_NAME),
+ help="Name for the simulated Ollama model (default: from env or lightrag)",
+ )
+
+ parser.add_argument(
+ "--simulated-model-tag",
+ type=str,
+ default=get_env_value("OLLAMA_EMULATING_MODEL_TAG", DEFAULT_OLLAMA_MODEL_TAG),
+ help="Tag for the simulated Ollama model (default: from env or latest)",
+ )
+
+ # Namespace
+ parser.add_argument(
+ "--workspace",
+ type=str,
+ default=get_env_value("WORKSPACE", ""),
+ help="Default workspace for all storage",
+ )
+
+ # Server workers configuration
+ parser.add_argument(
+ "--workers",
+ type=int,
+ default=get_env_value("WORKERS", DEFAULT_WOKERS, int),
+ help="Number of worker processes (default: from env or 1)",
+ )
+
+ # LLM and embedding bindings
+ parser.add_argument(
+ "--llm-binding",
+ type=str,
+ default=get_env_value("LLM_BINDING", "ollama"),
+ choices=[
+ "lollms",
+ "ollama",
+ "openai",
+ "openai-ollama",
+ "azure_openai",
+ "aws_bedrock",
+ "gemini",
+ ],
+ help="LLM binding type (default: from env or ollama)",
+ )
+ parser.add_argument(
+ "--embedding-binding",
+ type=str,
+ default=get_env_value("EMBEDDING_BINDING", "ollama"),
+ choices=[
+ "lollms",
+ "ollama",
+ "openai",
+ "azure_openai",
+ "aws_bedrock",
+ "jina",
+ "gemini",
+ "voyageai",
+ ],
+ help="Embedding binding type (default: from env or ollama)",
+ )
+ parser.add_argument(
+ "--rerank-binding",
+ type=str,
+ default=get_env_value("RERANK_BINDING", DEFAULT_RERANK_BINDING),
+ choices=["null", "cohere", "jina", "aliyun"],
+ help=f"Rerank binding type (default: from env or {DEFAULT_RERANK_BINDING})",
+ )
+
+ # Document loading engine configuration
+ parser.add_argument(
+ "--docling",
+ action="store_true",
+ default=False,
+ help="Enable DOCLING document loading engine (default: from env or DEFAULT)",
+ )
+
+ # Conditionally add binding-specific options (Ollama, OpenAI, Azure OpenAI, Gemini)
+ # This registers command line arguments (e.g., --openai-llm-temperature)
+ # and reads corresponding environment variables (e.g., OPENAI_LLM_TEMPERATURE)
+
+ # Determine LLM binding value consistently from command line or environment
+ llm_binding_value = None
+ if "--llm-binding" in sys.argv:
+ try:
+ idx = sys.argv.index("--llm-binding")
+ if idx + 1 < len(sys.argv) and not sys.argv[idx + 1].startswith("-"):
+ llm_binding_value = sys.argv[idx + 1]
+ except IndexError:
+ pass
+
+ # Fall back to environment variable using same function as argparse default
+ if llm_binding_value is None:
+ llm_binding_value = get_env_value("LLM_BINDING", "ollama")
+
+ # Add LLM binding options based on determined value
+ if llm_binding_value == "ollama":
+ OllamaLLMOptions.add_args(parser)
+ elif llm_binding_value in ["openai", "azure_openai"]:
+ OpenAILLMOptions.add_args(parser)
+ elif llm_binding_value == "gemini":
+ GeminiLLMOptions.add_args(parser)
+
+ # Determine embedding binding value consistently from command line or environment
+ embedding_binding_value = None
+ if "--embedding-binding" in sys.argv:
+ try:
+ idx = sys.argv.index("--embedding-binding")
+ if idx + 1 < len(sys.argv) and not sys.argv[idx + 1].startswith("-"):
+ embedding_binding_value = sys.argv[idx + 1]
+ except IndexError:
+ pass
+
+ # Fall back to environment variable using same function as argparse default
+ if embedding_binding_value is None:
+ embedding_binding_value = get_env_value("EMBEDDING_BINDING", "ollama")
+
+ # Add embedding binding options based on determined value
+ if embedding_binding_value == "ollama":
+ OllamaEmbeddingOptions.add_args(parser)
+ elif embedding_binding_value == "gemini":
+ GeminiEmbeddingOptions.add_args(parser)
+
+ args = parser.parse_args()
+
+ # convert relative path to absolute path
+ args.working_dir = os.path.abspath(args.working_dir)
+ args.input_dir = os.path.abspath(args.input_dir)
+
+ # Inject storage configuration from environment variables
+ args.kv_storage = get_env_value(
+ "LIGHTRAG_KV_STORAGE", DefaultRAGStorageConfig.KV_STORAGE
+ )
+ args.doc_status_storage = get_env_value(
+ "LIGHTRAG_DOC_STATUS_STORAGE", DefaultRAGStorageConfig.DOC_STATUS_STORAGE
+ )
+ args.graph_storage = get_env_value(
+ "LIGHTRAG_GRAPH_STORAGE", DefaultRAGStorageConfig.GRAPH_STORAGE
+ )
+ args.vector_storage = get_env_value(
+ "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE
+ )
+
+ # Get MAX_PARALLEL_INSERT from environment
+ args.max_parallel_insert = get_env_value("MAX_PARALLEL_INSERT", 2, int)
+
+ # Get MAX_GRAPH_NODES from environment
+ args.max_graph_nodes = get_env_value("MAX_GRAPH_NODES", 1000, int)
+
+ # Handle openai-ollama special case
+ if args.llm_binding == "openai-ollama":
+ args.llm_binding = "openai"
+ args.embedding_binding = "ollama"
+
+ args.llm_binding_host = get_env_value(
+ "LLM_BINDING_HOST", get_default_host(args.llm_binding)
+ )
+ args.embedding_binding_host = get_env_value(
+ "EMBEDDING_BINDING_HOST", get_default_host(args.embedding_binding)
+ )
+ args.llm_binding_api_key = get_env_value("LLM_BINDING_API_KEY", None)
+ args.embedding_binding_api_key = get_env_value("EMBEDDING_BINDING_API_KEY", "")
+
+ # Inject model configuration
+ args.llm_model = get_env_value("LLM_MODEL", "mistral-nemo:latest")
+ # EMBEDDING_MODEL defaults to None - each binding will use its own default model
+ # e.g., OpenAI uses "text-embedding-3-small", Jina uses "jina-embeddings-v4"
+ args.embedding_model = get_env_value("EMBEDDING_MODEL", None, special_none=True)
+ # EMBEDDING_DIM defaults to None - each binding will use its own default dimension
+ # Value is inherited from provider defaults via wrap_embedding_func_with_attrs decorator
+ args.embedding_dim = get_env_value("EMBEDDING_DIM", None, int, special_none=True)
+ args.embedding_send_dim = get_env_value("EMBEDDING_SEND_DIM", False, bool)
+
+ # Inject chunk configuration
+ args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int)
+ args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int)
+
+ # Inject LLM cache configuration
+ args.enable_llm_cache_for_extract = get_env_value(
+ "ENABLE_LLM_CACHE_FOR_EXTRACT", True, bool
+ )
+ args.enable_llm_cache = get_env_value("ENABLE_LLM_CACHE", True, bool)
+
+ # Set document_loading_engine from --docling flag
+ if args.docling:
+ args.document_loading_engine = "DOCLING"
+ else:
+ args.document_loading_engine = get_env_value(
+ "DOCUMENT_LOADING_ENGINE", "DEFAULT"
+ )
+
+ # PDF decryption password
+ args.pdf_decrypt_password = get_env_value("PDF_DECRYPT_PASSWORD", None)
+
+ # Add environment variables that were previously read directly
+ args.cors_origins = get_env_value("CORS_ORIGINS", "*")
+ args.summary_language = get_env_value("SUMMARY_LANGUAGE", DEFAULT_SUMMARY_LANGUAGE)
+ args.entity_types = get_env_value("ENTITY_TYPES", DEFAULT_ENTITY_TYPES, list)
+ args.whitelist_paths = get_env_value("WHITELIST_PATHS", "/health,/api/*")
+
+ # For JWT Auth
+ args.auth_accounts = get_env_value("AUTH_ACCOUNTS", "")
+ args.token_secret = get_env_value("TOKEN_SECRET", None)
+ args.token_expire_hours = get_env_value("TOKEN_EXPIRE_HOURS", 48, float)
+ args.guest_token_expire_hours = get_env_value("GUEST_TOKEN_EXPIRE_HOURS", 24, float)
+ args.jwt_algorithm = get_env_value("JWT_ALGORITHM", "HS256")
+
+ # Token auto-renewal configuration (sliding window expiration)
+ args.token_auto_renew = get_env_value("TOKEN_AUTO_RENEW", True, bool)
+ args.token_renew_threshold = get_env_value("TOKEN_RENEW_THRESHOLD", 0.5, float)
+
+ # Rerank model configuration
+ args.rerank_model = get_env_value("RERANK_MODEL", None)
+ args.rerank_binding_host = get_env_value("RERANK_BINDING_HOST", None)
+ args.rerank_binding_api_key = get_env_value("RERANK_BINDING_API_KEY", None)
+ # Note: rerank_binding is already set by argparse, no need to override from env
+
+ # Min rerank score configuration
+ args.min_rerank_score = get_env_value(
+ "MIN_RERANK_SCORE", DEFAULT_MIN_RERANK_SCORE, float
+ )
+
+ # Query configuration
+ args.history_turns = get_env_value("HISTORY_TURNS", DEFAULT_HISTORY_TURNS, int)
+ args.top_k = get_env_value("TOP_K", DEFAULT_TOP_K, int)
+ args.chunk_top_k = get_env_value("CHUNK_TOP_K", DEFAULT_CHUNK_TOP_K, int)
+ args.max_entity_tokens = get_env_value(
+ "MAX_ENTITY_TOKENS", DEFAULT_MAX_ENTITY_TOKENS, int
+ )
+ args.max_relation_tokens = get_env_value(
+ "MAX_RELATION_TOKENS", DEFAULT_MAX_RELATION_TOKENS, int
+ )
+ args.max_total_tokens = get_env_value(
+ "MAX_TOTAL_TOKENS", DEFAULT_MAX_TOTAL_TOKENS, int
+ )
+ args.cosine_threshold = get_env_value(
+ "COSINE_THRESHOLD", DEFAULT_COSINE_THRESHOLD, float
+ )
+ args.related_chunk_number = get_env_value(
+ "RELATED_CHUNK_NUMBER", DEFAULT_RELATED_CHUNK_NUMBER, int
+ )
+
+ # Add missing environment variables for health endpoint
+ args.force_llm_summary_on_merge = get_env_value(
+ "FORCE_LLM_SUMMARY_ON_MERGE", DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, int
+ )
+ args.embedding_func_max_async = get_env_value(
+ "EMBEDDING_FUNC_MAX_ASYNC", DEFAULT_EMBEDDING_FUNC_MAX_ASYNC, int
+ )
+ args.embedding_batch_num = get_env_value(
+ "EMBEDDING_BATCH_NUM", DEFAULT_EMBEDDING_BATCH_NUM, int
+ )
+
+ # Embedding token limit configuration
+ args.embedding_token_limit = get_env_value(
+ "EMBEDDING_TOKEN_LIMIT", None, int, special_none=True
+ )
+
+ # File upload size limit (in bytes, None for unlimited)
+ # Default: 100MB (104857600 bytes)
+ args.max_upload_size = get_env_value(
+ "MAX_UPLOAD_SIZE", 104857600, int, special_none=True
+ )
+
+ # Embedding prefix configuration for context-aware embeddings. Empty prefixes
+ # must be explicit via NO_PREFIX so missing config is distinguishable.
+ (
+ args.embedding_document_prefix,
+ args.embedding_document_prefix_configured,
+ ) = get_embedding_prefix_config("EMBEDDING_DOCUMENT_PREFIX")
+ (
+ args.embedding_query_prefix,
+ args.embedding_query_prefix_configured,
+ ) = get_embedding_prefix_config("EMBEDDING_QUERY_PREFIX")
+ args.embedding_prefix_no_prefix_sentinel = NO_PREFIX_SENTINEL
+ args.embedding_prefixes_configured = (
+ args.embedding_document_prefix_configured
+ or args.embedding_query_prefix_configured
+ )
+ # Asymmetric embedding behavior toggle
+ args.embedding_asymmetric_configured = "EMBEDDING_ASYMMETRIC" in os.environ
+ args.embedding_asymmetric = get_env_value("EMBEDDING_ASYMMETRIC", False, bool)
+
+ ollama_server_infos.LIGHTRAG_NAME = args.simulated_model_name
+ ollama_server_infos.LIGHTRAG_TAG = args.simulated_model_tag
+
+ # Sanitize workspace: only alphanumeric characters and underscores are allowed
+ if args.workspace:
+ sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", args.workspace)
+ if sanitized != args.workspace:
+ logging.warning(
+ f"Workspace name '{args.workspace}' contains invalid characters. "
+ f"It has been sanitized to '{sanitized}'. "
+ "Only alphanumeric characters and underscores are allowed."
+ )
+ args.workspace = sanitized
+
+ validate_auth_configuration(args)
+ return args
+
+
+def update_uvicorn_mode_config():
+ # If in uvicorn mode and workers > 1, force it to 1 and log warning
+ if global_args.workers > 1:
+ original_workers = global_args.workers
+ global_args.workers = 1
+ # Log warning directly here
+ logging.debug(
+ f">> Forcing workers=1 in uvicorn mode(Ignoring workers={original_workers})"
+ )
+
+
+# Global configuration with lazy initialization
+_global_args = None
+_initialized = False
+
+
+def initialize_config(args=None, force=False):
+ """Initialize global configuration
+
+ This function allows explicit initialization of the configuration,
+ which is useful for programmatic usage, testing, or embedding LightRAG
+ in other applications.
+
+ Args:
+ args: Pre-parsed argparse.Namespace or None to parse from sys.argv
+ force: Force re-initialization even if already initialized
+
+ Returns:
+ argparse.Namespace: The configured arguments
+
+ Example:
+ # Use parsed command line arguments (default)
+ initialize_config()
+
+ # Use custom configuration programmatically
+ custom_args = argparse.Namespace(
+ host='localhost',
+ port=8080,
+ working_dir='./custom_rag',
+ # ... other config
+ )
+ initialize_config(custom_args)
+ """
+ global _global_args, _initialized
+
+ if _initialized and not force:
+ return _global_args
+
+ resolved_args = args if args is not None else parse_args()
+ validate_auth_configuration(resolved_args)
+ _global_args = resolved_args
+ _initialized = True
+ return _global_args
+
+
+def get_config():
+ """Get global configuration, auto-initializing if needed
+
+ Returns:
+ argparse.Namespace: The configured arguments
+ """
+ if not _initialized:
+ initialize_config()
+ return _global_args
+
+
+class _GlobalArgsProxy:
+ """Proxy object that auto-initializes configuration on first access
+
+ This maintains backward compatibility with existing code while
+ allowing programmatic control over initialization timing.
+
+ The proxy fully delegates to the underlying argparse.Namespace,
+ including support for vars() calls which is used by binding_options
+ to extract provider-specific configuration options.
+ """
+
+ def __getattribute__(self, name):
+ """Override attribute access to support vars() and regular attribute access.
+
+ This method intercepts __dict__ access (used by vars()) and delegates
+ to the underlying _global_args namespace, ensuring binding options
+ can be properly extracted.
+ """
+ global _initialized, _global_args
+
+ # Handle __dict__ access for vars() support
+ if name == "__dict__":
+ if not _initialized:
+ initialize_config()
+ return vars(_global_args)
+
+ # Handle class-level attributes that should come from the proxy itself
+ if name in ("__class__", "__repr__", "__getattribute__", "__setattr__"):
+ return object.__getattribute__(self, name)
+
+ # Delegate all other attribute access to the underlying namespace
+ if not _initialized:
+ initialize_config()
+ return getattr(_global_args, name)
+
+ def __setattr__(self, name, value):
+ global _initialized, _global_args
+ if not _initialized:
+ initialize_config()
+ setattr(_global_args, name, value)
+
+ def __repr__(self):
+ global _initialized, _global_args
+ if not _initialized:
+ return "
") # Windows newline -> \n
+ .replace("\r", "
") # Mac newline -> \n
+ .replace("\n", "
") # Unix newline -> \n
+ )
+
+ content_parts = []
+ in_table = False # Track if we're currently processing a table
+
+ # Iterate through all body elements in document order
+ for element in doc.element.body:
+ # Check if element is a paragraph
+ if element.tag.endswith("p"):
+ # If coming out of a table, add blank line after table
+ if in_table:
+ content_parts.append("") # Blank line after table
+ in_table = False
+
+ paragraph = Paragraph(element, doc)
+ text = paragraph.text
+ # Always append to preserve document spacing (including blank paragraphs)
+ content_parts.append(text)
+
+ # Check if element is a table
+ elif element.tag.endswith("tbl"):
+ # Add blank line before table (if content exists)
+ if content_parts and not in_table:
+ content_parts.append("") # Blank line before table
+
+ in_table = True
+ table = Table(element, doc)
+ for row in table.rows:
+ row_text = []
+ for cell in row.cells:
+ cell_text = cell.text
+ # Escape special characters to preserve tab-delimited structure
+ row_text.append(escape_cell(cell_text))
+ # Only add row if at least one cell has content
+ if any(cell for cell in row_text):
+ content_parts.append("\t".join(row_text))
+
+ return "\n".join(content_parts)
+
+
+def _extract_pptx(file_bytes: bytes) -> str:
+ """Extract PPTX content (synchronous).
+
+ Args:
+ file_bytes: PPTX file content as bytes
+
+ Returns:
+ str: Extracted text content
+ """
+ from pptx import Presentation # type: ignore
+
+ pptx_file = BytesIO(file_bytes)
+ prs = Presentation(pptx_file)
+ content = ""
+ for slide in prs.slides:
+ for shape in slide.shapes:
+ if hasattr(shape, "text"):
+ content += shape.text + "\n"
+ return content
+
+
+def _extract_xlsx(file_bytes: bytes) -> str:
+ """Extract XLSX content in tab-delimited format with clear sheet separation.
+
+ This function processes Excel workbooks and converts them to a structured text format
+ suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
+ separator lines, and special characters are escaped to preserve the tab-delimited structure.
+
+ Features:
+ - Each sheet is wrapped with '====================' separators for visual distinction
+ - Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
+ - Column alignment is preserved across all rows to maintain tabular structure
+ - Empty rows are preserved as blank lines to maintain row structure
+ - Uses sheet.max_column to determine column width efficiently
+
+ Args:
+ file_bytes: XLSX file content as bytes
+
+ Returns:
+ str: Extracted text content with all sheets in tab-delimited format.
+ Format: Sheet separators, sheet name, then tab-delimited rows.
+
+ Example output:
+ ==================== Sheet: Data ====================
+ Name\tAge\tCity
+ Alice\t30\tNew York
+ Bob\t25\tLondon
+
+ ==================== Sheet: Summary ====================
+ Total\t2
+ ====================
+ """
+ from openpyxl import load_workbook # type: ignore
+
+ xlsx_file = BytesIO(file_bytes)
+ wb = load_workbook(xlsx_file)
+
+ def escape_cell(cell_value: str | int | float | None) -> str:
+ """Escape characters that would break tab-delimited layout.
+
+ Escape order is critical: backslashes first, then tabs/newlines.
+ This prevents double-escaping issues.
+
+ Args:
+ cell_value: The cell value to escape (can be None, str, int, or float)
+
+ Returns:
+ str: Escaped cell value safe for tab-delimited format
+ """
+ if cell_value is None:
+ return ""
+ text = str(cell_value)
+ # CRITICAL: Escape backslash first to avoid double-escaping
+ return (
+ text.replace("\\", "\\\\") # Must be first: \ -> \\
+ .replace("\t", "\\t") # Tab -> \t (visible)
+ .replace("\r\n", "\\n") # Windows newline -> \n
+ .replace("\r", "\\n") # Mac newline -> \n
+ .replace("\n", "\\n") # Unix newline -> \n
+ )
+
+ def escape_sheet_title(title: str) -> str:
+ """Escape sheet title to prevent formatting issues in separators.
+
+ Args:
+ title: Original sheet title
+
+ Returns:
+ str: Sanitized sheet title with tabs/newlines replaced
+ """
+ return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")
+
+ content_parts: list[str] = []
+ sheet_separator = "=" * 20
+
+ for idx, sheet in enumerate(wb):
+ if idx > 0:
+ content_parts.append("") # Blank line between sheets for readability
+
+ # Escape sheet title to handle edge cases with special characters
+ safe_title = escape_sheet_title(sheet.title)
+ content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")
+
+ # Use sheet.max_column to get the maximum column width directly
+ max_columns = sheet.max_column if sheet.max_column else 0
+
+ # Extract rows with consistent width to preserve column alignment
+ for row in sheet.iter_rows(values_only=True):
+ row_parts = []
+
+ # Build row up to max_columns width
+ for idx in range(max_columns):
+ if idx < len(row):
+ row_parts.append(escape_cell(row[idx]))
+ else:
+ row_parts.append("") # Pad short rows
+
+ # Check if row is completely empty
+ if all(part == "" for part in row_parts):
+ # Preserve empty rows as blank lines (maintains row structure)
+ content_parts.append("")
+ else:
+ # Join all columns to maintain consistent column count
+ content_parts.append("\t".join(row_parts))
+
+ # Final separator for symmetry (makes parsing easier)
+ content_parts.append(sheet_separator)
+ return "\n".join(content_parts)
+
+
+async def pipeline_enqueue_file(
+ rag: LightRAG, file_path: Path, track_id: str = None
+) -> tuple[bool, str]:
+ """Add a file to the queue for processing
+
+ Args:
+ rag: LightRAG instance
+ file_path: Path to the saved file
+ track_id: Optional tracking ID, if not provided will be generated
+ Returns:
+ tuple: (success: bool, track_id: str)
+ """
+
+ # Generate track_id if not provided
+ if track_id is None:
+ track_id = generate_track_id("unknown")
+
+ try:
+ content = ""
+ ext = file_path.suffix.lower()
+ file_size = 0
+
+ # Get file size for error reporting
+ try:
+ stat = await asyncio.to_thread(file_path.stat)
+ file_size = stat.st_size
+ except Exception:
+ file_size = 0
+
+ file = None
+ try:
+ async with aiofiles.open(file_path, "rb") as f:
+ file = await f.read()
+ except PermissionError as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]Permission denied - cannot read file",
+ "original_error": str(e),
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(
+ f"[File Extraction]Permission denied reading file: {file_path.name}"
+ )
+ return False, track_id
+ except FileNotFoundError as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]File not found",
+ "original_error": str(e),
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(f"[File Extraction]File not found: {file_path.name}")
+ return False, track_id
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]File reading error",
+ "original_error": str(e),
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(
+ f"[File Extraction]Error reading file {file_path.name}: {str(e)}"
+ )
+ return False, track_id
+
+ # Process based on file type
+ try:
+ match ext:
+ case (
+ ".txt"
+ | ".md"
+ | ".mdx"
+ | ".html"
+ | ".htm"
+ | ".tex"
+ | ".json"
+ | ".xml"
+ | ".yaml"
+ | ".yml"
+ | ".rtf"
+ | ".odt"
+ | ".epub"
+ | ".csv"
+ | ".log"
+ | ".conf"
+ | ".ini"
+ | ".properties"
+ | ".sql"
+ | ".bat"
+ | ".sh"
+ | ".c"
+ | ".h"
+ | ".cpp"
+ | ".hpp"
+ | ".py"
+ | ".java"
+ | ".js"
+ | ".ts"
+ | ".swift"
+ | ".go"
+ | ".rb"
+ | ".php"
+ | ".css"
+ | ".scss"
+ | ".less"
+ ):
+ try:
+ # Try to decode as UTF-8 (offloaded to thread to avoid blocking the event loop)
+ content = await asyncio.to_thread(file.decode, "utf-8")
+
+ # Validate content
+ if not content or len(content.strip()) == 0:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]Empty file content",
+ "original_error": "File contains no content or only whitespace",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]Empty content in file: {file_path.name}"
+ )
+ return False, track_id
+
+ # Check if content looks like binary data string representation
+ if content.startswith("b'") or content.startswith('b"'):
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]Binary data in text file",
+ "original_error": "File appears to contain binary data representation instead of text",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]File {file_path.name} appears to contain binary data representation instead of text"
+ )
+ return False, track_id
+
+ except UnicodeDecodeError as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]UTF-8 encoding error, please convert it to UTF-8 before processing",
+ "original_error": f"File is not valid UTF-8 encoded text: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing."
+ )
+ return False, track_id
+
+ case ".pdf":
+ try:
+ # Try DOCLING first if configured and available
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and _is_docling_available()
+ ):
+ content = await asyncio.to_thread(
+ _convert_with_docling, file_path
+ )
+ else:
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and not _is_docling_available()
+ ):
+ logger.warning(
+ f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
+ )
+ # Use pypdf (non-blocking via to_thread)
+ content = await asyncio.to_thread(
+ _extract_pdf_pypdf,
+ file,
+ global_args.pdf_decrypt_password,
+ )
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]PDF processing error",
+ "original_error": f"Failed to extract text from PDF: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]Error processing PDF {file_path.name}: {str(e)}"
+ )
+ return False, track_id
+
+ case ".docx":
+ try:
+ # Try DOCLING first if configured and available
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and _is_docling_available()
+ ):
+ content = await asyncio.to_thread(
+ _convert_with_docling, file_path
+ )
+ else:
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and not _is_docling_available()
+ ):
+ logger.warning(
+ f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
+ )
+ # Use python-docx (non-blocking via to_thread)
+ content = await asyncio.to_thread(_extract_docx, file)
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]DOCX processing error",
+ "original_error": f"Failed to extract text from DOCX: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]Error processing DOCX {file_path.name}: {str(e)}"
+ )
+ return False, track_id
+
+ case ".pptx":
+ try:
+ # Try DOCLING first if configured and available
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and _is_docling_available()
+ ):
+ content = await asyncio.to_thread(
+ _convert_with_docling, file_path
+ )
+ else:
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and not _is_docling_available()
+ ):
+ logger.warning(
+ f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
+ )
+ # Use python-pptx (non-blocking via to_thread)
+ content = await asyncio.to_thread(_extract_pptx, file)
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]PPTX processing error",
+ "original_error": f"Failed to extract text from PPTX: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]Error processing PPTX {file_path.name}: {str(e)}"
+ )
+ return False, track_id
+
+ case ".xlsx":
+ try:
+ # Try DOCLING first if configured and available
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and _is_docling_available()
+ ):
+ content = await asyncio.to_thread(
+ _convert_with_docling, file_path
+ )
+ else:
+ if (
+ global_args.document_loading_engine == "DOCLING"
+ and not _is_docling_available()
+ ):
+ logger.warning(
+ f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
+ )
+ # Use openpyxl (non-blocking via to_thread)
+ content = await asyncio.to_thread(_extract_xlsx, file)
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]XLSX processing error",
+ "original_error": f"Failed to extract text from XLSX: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(
+ error_files, track_id
+ )
+ logger.error(
+ f"[File Extraction]Error processing XLSX {file_path.name}: {str(e)}"
+ )
+ return False, track_id
+
+ case _:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": f"[File Extraction]Unsupported file type: {ext}",
+ "original_error": f"File extension {ext} is not supported",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(
+ f"[File Extraction]Unsupported file type: {file_path.name} (extension {ext})"
+ )
+ return False, track_id
+
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]File format processing error",
+ "original_error": f"Unexpected error during file extracting: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(
+ f"[File Extraction]Unexpected error during {file_path.name} extracting: {str(e)}"
+ )
+ return False, track_id
+
+ # Insert into the RAG queue
+ if content:
+ # Check if content contains only whitespace characters
+ if not content.strip():
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "[File Extraction]File contains only whitespace",
+ "original_error": "File content contains only whitespace characters",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.warning(
+ f"[File Extraction]File contains only whitespace characters: {file_path.name}"
+ )
+ return False, track_id
+
+ try:
+ await rag.apipeline_enqueue_documents(
+ content, file_paths=file_path.name, track_id=track_id
+ )
+
+ logger.info(
+ f"Successfully extracted and enqueued file: {file_path.name}"
+ )
+
+ # Move file to __enqueued__ directory after enqueuing
+ try:
+ enqueued_dir = file_path.parent / "__enqueued__"
+ await asyncio.to_thread(enqueued_dir.mkdir, exist_ok=True)
+
+ # Generate unique filename to avoid conflicts
+ unique_filename = get_unique_filename_in_enqueued(
+ enqueued_dir, file_path.name
+ )
+ target_path = enqueued_dir / unique_filename
+
+ # Move the file
+ await asyncio.to_thread(file_path.rename, target_path)
+ logger.debug(
+ f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}"
+ )
+
+ except Exception as move_error:
+ logger.error(
+ f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}"
+ )
+ # Don't affect the main function's success status
+
+ return True, track_id
+
+ except Exception as e:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "Document enqueue error",
+ "original_error": f"Failed to enqueue document: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(f"Error enqueueing document {file_path.name}: {str(e)}")
+ return False, track_id
+ else:
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "No content extracted",
+ "original_error": "No content could be extracted from file",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(f"No content extracted from file: {file_path.name}")
+ return False, track_id
+
+ except Exception as e:
+ # Catch-all for any unexpected errors
+ try:
+ file_size = file_path.stat().st_size if file_path.exists() else 0
+ except Exception:
+ file_size = 0
+
+ error_files = [
+ {
+ "file_path": str(file_path.name),
+ "error_description": "Unexpected processing error",
+ "original_error": f"Unexpected error: {str(e)}",
+ "file_size": file_size,
+ }
+ ]
+ await rag.apipeline_enqueue_error_documents(error_files, track_id)
+ logger.error(f"Enqueuing file {file_path.name} error: {str(e)}")
+ logger.error(traceback.format_exc())
+ return False, track_id
+ finally:
+ if file_path.name.startswith(temp_prefix):
+ try:
+ file_path.unlink()
+ except Exception as e:
+ logger.error(f"Error deleting file {file_path}: {str(e)}")
+
+
+async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
+ """Index a file with track_id
+
+ Args:
+ rag: LightRAG instance
+ file_path: Path to the saved file
+ track_id: Optional tracking ID
+ """
+ try:
+ success, returned_track_id = await pipeline_enqueue_file(
+ rag, file_path, track_id
+ )
+ if success:
+ await rag.apipeline_process_enqueue_documents()
+
+ except Exception as e:
+ logger.error(f"Error indexing file {file_path.name}: {str(e)}")
+ logger.error(traceback.format_exc())
+
+
+async def pipeline_index_files(
+ rag: LightRAG, file_paths: List[Path], track_id: str = None
+):
+ """Index multiple files sequentially to avoid high CPU load
+
+ Args:
+ rag: LightRAG instance
+ file_paths: Paths to the files to index
+ track_id: Optional tracking ID to pass to all files
+ """
+ if not file_paths:
+ return
+ try:
+ enqueued = False
+
+ # Use get_pinyin_sort_key for Chinese pinyin sorting
+ sorted_file_paths = sorted(
+ file_paths, key=lambda p: get_pinyin_sort_key(str(p))
+ )
+
+ # Process files sequentially with track_id
+ for file_path in sorted_file_paths:
+ success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
+ if success:
+ enqueued = True
+
+ # Process the queue only if at least one file was successfully enqueued
+ if enqueued:
+ await rag.apipeline_process_enqueue_documents()
+ except Exception as e:
+ logger.error(f"Error indexing files: {str(e)}")
+ logger.error(traceback.format_exc())
+
+
+async def pipeline_index_texts(
+ rag: LightRAG,
+ texts: List[str],
+ file_sources: List[str] = None,
+ track_id: str = None,
+):
+ """Index a list of texts with track_id
+
+ Args:
+ rag: LightRAG instance
+ texts: The texts to index
+ file_sources: Sources of the texts
+ track_id: Optional tracking ID
+ """
+ if not texts:
+ return
+
+ normalized_file_sources: list[str] | None = None
+ if file_sources:
+ normalized_file_sources = [
+ normalize_file_path(source) for source in file_sources
+ ]
+ if len(normalized_file_sources) > len(texts):
+ raise ValueError("Number of file sources must not exceed number of texts")
+ if len(normalized_file_sources) < len(texts):
+ normalized_file_sources.extend(
+ [UNKNOWN_FILE_SOURCE] * (len(texts) - len(normalized_file_sources))
+ )
+
+ await rag.apipeline_enqueue_documents(
+ input=texts, file_paths=normalized_file_sources, track_id=track_id
+ )
+ await rag.apipeline_process_enqueue_documents()
+
+
+async def run_scanning_process(
+ rag: LightRAG, doc_manager: DocumentManager, track_id: str = None
+):
+ """Background task to scan and index documents
+
+ Args:
+ rag: LightRAG instance
+ doc_manager: DocumentManager instance
+ track_id: Optional tracking ID to pass to all scanned files
+ """
+ try:
+ new_files = doc_manager.scan_directory_for_new_files()
+ total_files = len(new_files)
+ logger.info(f"Found {total_files} files to index.")
+
+ if new_files:
+ # Check for files with PROCESSED status and filter them out
+ valid_files = []
+ processed_files = []
+
+ for file_path in new_files:
+ filename = file_path.name
+ existing_doc_data = await rag.doc_status.get_doc_by_file_path(filename)
+
+ if existing_doc_data and existing_doc_data.get("status") == "processed":
+ # File is already PROCESSED, skip it with warning
+ processed_files.append(filename)
+ logger.warning(f"Skipping already processed file: {filename}")
+ else:
+ # File is new or in non-PROCESSED status, add to processing list
+ valid_files.append(file_path)
+
+ # Process valid files (new files + non-PROCESSED status files)
+ if valid_files:
+ await pipeline_index_files(rag, valid_files, track_id)
+ if processed_files:
+ logger.info(
+ f"Scanning process completed: {len(valid_files)} files Processed {len(processed_files)} skipped."
+ )
+ else:
+ logger.info(
+ f"Scanning process completed: {len(valid_files)} files Processed."
+ )
+ else:
+ logger.info(
+ "No files to process after filtering already processed files."
+ )
+ else:
+ # No new files to index, check if there are any documents in the queue
+ logger.info(
+ "No upload file found, check if there are any documents in the queue..."
+ )
+ await rag.apipeline_process_enqueue_documents()
+
+ except Exception as e:
+ logger.error(f"Error during scanning process: {str(e)}")
+ logger.error(traceback.format_exc())
+
+
+async def background_delete_documents(
+ rag: LightRAG,
+ doc_manager: DocumentManager,
+ doc_ids: List[str],
+ delete_file: bool = False,
+ delete_llm_cache: bool = False,
+):
+ """Background task to delete multiple documents"""
+ from lightrag.kg.shared_storage import (
+ get_namespace_data,
+ get_namespace_lock,
+ )
+
+ pipeline_status = await get_namespace_data(
+ "pipeline_status", workspace=rag.workspace
+ )
+ pipeline_status_lock = get_namespace_lock(
+ "pipeline_status", workspace=rag.workspace
+ )
+
+ total_docs = len(doc_ids)
+ successful_deletions = []
+ failed_deletions = []
+
+ # Double-check pipeline status before proceeding
+ async with pipeline_status_lock:
+ if pipeline_status.get("busy", False):
+ logger.warning("Error: Unexpected pipeline busy state, aborting deletion.")
+ return # Abort deletion operation
+
+ # Set pipeline status to busy for deletion
+ pipeline_status.update(
+ {
+ "busy": True,
+ # Job name can not be changed, it's verified in adelete_by_doc_id()
+ "job_name": f"Deleting {total_docs} Documents",
+ "job_start": datetime.now().isoformat(),
+ "docs": total_docs,
+ "batchs": total_docs,
+ "cur_batch": 0,
+ "latest_message": "Starting document deletion process",
+ }
+ )
+ # Use slice assignment to clear the list in place
+ pipeline_status["history_messages"][:] = ["Starting document deletion process"]
+ if delete_llm_cache:
+ pipeline_status["history_messages"].append(
+ "LLM cache cleanup requested for this deletion job"
+ )
+
+ try:
+ # Loop through each document ID and delete them one by one
+ for i, doc_id in enumerate(doc_ids, 1):
+ # Check for cancellation at the start of each document deletion
+ async with pipeline_status_lock:
+ if pipeline_status.get("cancellation_requested", False):
+ cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
+ logger.info(cancel_msg)
+ pipeline_status["latest_message"] = cancel_msg
+ pipeline_status["history_messages"].append(cancel_msg)
+ # Add remaining documents to failed list with cancellation reason
+ failed_deletions.extend(
+ doc_ids[i - 1 :]
+ ) # i-1 because enumerate starts at 1
+ break # Exit the loop, remaining documents unchanged
+
+ start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
+ logger.info(start_msg)
+ pipeline_status["cur_batch"] = i
+ pipeline_status["latest_message"] = start_msg
+ pipeline_status["history_messages"].append(start_msg)
+
+ file_path = "#"
+ try:
+ result = await rag.adelete_by_doc_id(
+ doc_id, delete_llm_cache=delete_llm_cache
+ )
+ file_path = (
+ getattr(result, "file_path", "-") if "result" in locals() else "-"
+ )
+ if result.status == "success":
+ successful_deletions.append(doc_id)
+ success_msg = (
+ f"Document deleted {i}/{total_docs}: {doc_id}[{file_path}]"
+ )
+ logger.info(success_msg)
+ async with pipeline_status_lock:
+ pipeline_status["history_messages"].append(success_msg)
+
+ # Handle file deletion if requested and file_path is available
+ if (
+ delete_file
+ and result.file_path
+ and result.file_path != "unknown_source"
+ ):
+ try:
+ deleted_files = []
+ # SECURITY FIX: Use secure path validation to prevent arbitrary file deletion
+ safe_file_path = validate_file_path_security(
+ result.file_path, doc_manager.input_dir
+ )
+
+ if safe_file_path is None:
+ # Security violation detected - log and skip file deletion
+ security_msg = f"Security violation: Unsafe file path detected for deletion - {result.file_path}"
+ logger.warning(security_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = security_msg
+ pipeline_status["history_messages"].append(
+ security_msg
+ )
+ else:
+ # check and delete files from input_dir directory
+ if safe_file_path.exists():
+ try:
+ safe_file_path.unlink()
+ deleted_files.append(safe_file_path.name)
+ file_delete_msg = f"Successfully deleted input_dir file: {result.file_path}"
+ logger.info(file_delete_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = (
+ file_delete_msg
+ )
+ pipeline_status["history_messages"].append(
+ file_delete_msg
+ )
+ except Exception as file_error:
+ file_error_msg = f"Failed to delete input_dir file {result.file_path}: {str(file_error)}"
+ logger.debug(file_error_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = (
+ file_error_msg
+ )
+ pipeline_status["history_messages"].append(
+ file_error_msg
+ )
+
+ # Also check and delete files from __enqueued__ directory
+ enqueued_dir = doc_manager.input_dir / "__enqueued__"
+ if enqueued_dir.exists():
+ # SECURITY FIX: Validate that the file path is safe before processing
+ # Only proceed if the original path validation passed
+ base_name = Path(result.file_path).stem
+ extension = Path(result.file_path).suffix
+
+ # Search for exact match and files with numeric suffixes
+ for enqueued_file in enqueued_dir.glob(
+ f"{base_name}*{extension}"
+ ):
+ # Additional security check: ensure enqueued file is within enqueued directory
+ safe_enqueued_path = (
+ validate_file_path_security(
+ enqueued_file.name, enqueued_dir
+ )
+ )
+ if safe_enqueued_path is not None:
+ try:
+ enqueued_file.unlink()
+ deleted_files.append(enqueued_file.name)
+ logger.info(
+ f"Successfully deleted enqueued file: {enqueued_file.name}"
+ )
+ except Exception as enqueued_error:
+ file_error_msg = f"Failed to delete enqueued file {enqueued_file.name}: {str(enqueued_error)}"
+ logger.debug(file_error_msg)
+ async with pipeline_status_lock:
+ pipeline_status[
+ "latest_message"
+ ] = file_error_msg
+ pipeline_status[
+ "history_messages"
+ ].append(file_error_msg)
+ else:
+ security_msg = f"Security violation: Unsafe enqueued file path detected - {enqueued_file.name}"
+ logger.warning(security_msg)
+
+ if deleted_files == []:
+ file_error_msg = f"File deletion skipped, missing or unsafe file: {result.file_path}"
+ logger.warning(file_error_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = file_error_msg
+ pipeline_status["history_messages"].append(
+ file_error_msg
+ )
+
+ except Exception as file_error:
+ file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}"
+ logger.error(file_error_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = file_error_msg
+ pipeline_status["history_messages"].append(
+ file_error_msg
+ )
+ elif delete_file:
+ no_file_msg = (
+ f"File deletion skipped, missing file path: {doc_id}"
+ )
+ logger.warning(no_file_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = no_file_msg
+ pipeline_status["history_messages"].append(no_file_msg)
+ else:
+ failed_deletions.append(doc_id)
+ error_msg = f"Failed to delete {i}/{total_docs}: {doc_id}[{file_path}] - {result.message}"
+ logger.error(error_msg)
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = error_msg
+ pipeline_status["history_messages"].append(error_msg)
+
+ except Exception as e:
+ failed_deletions.append(doc_id)
+ error_msg = f"Error deleting document {i}/{total_docs}: {doc_id}[{file_path}] - {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = error_msg
+ pipeline_status["history_messages"].append(error_msg)
+
+ except Exception as e:
+ error_msg = f"Critical error during batch deletion: {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ async with pipeline_status_lock:
+ pipeline_status["history_messages"].append(error_msg)
+ finally:
+ # Final summary and check for pending requests
+ async with pipeline_status_lock:
+ pipeline_status["busy"] = False
+ pipeline_status["pending_requests"] = False # Reset pending requests flag
+ pipeline_status["cancellation_requested"] = (
+ False # Always reset cancellation flag
+ )
+ completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
+ pipeline_status["latest_message"] = completion_msg
+ pipeline_status["history_messages"].append(completion_msg)
+
+ # Check if there are pending document indexing requests
+ has_pending_request = pipeline_status.get("request_pending", False)
+
+ # If there are pending requests, start document processing pipeline
+ if has_pending_request:
+ try:
+ logger.info(
+ "Processing pending document indexing requests after deletion"
+ )
+ await rag.apipeline_process_enqueue_documents()
+ except Exception as e:
+ logger.error(f"Error processing pending documents after deletion: {e}")
+
+
+def create_document_routes(
+ rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
+):
+ # Create combined auth dependency for document routes
+ combined_auth = get_combined_auth_dependency(api_key)
+
+ @router.post(
+ "/scan", response_model=ScanResponse, dependencies=[Depends(combined_auth)]
+ )
+ async def scan_for_new_documents(background_tasks: BackgroundTasks):
+ """
+ Trigger the scanning process for new documents.
+
+ This endpoint initiates a background task that scans the input directory for new documents
+ and processes them. If a scanning process is already running, it returns a status indicating
+ that fact.
+
+ Returns:
+ ScanResponse: A response object containing the scanning status and track_id
+ """
+ # Generate track_id with "scan" prefix for scanning operation
+ track_id = generate_track_id("scan")
+
+ # Start the scanning process in the background with track_id
+ background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
+ return ScanResponse(
+ status="scanning_started",
+ message="Scanning process has been initiated in the background",
+ track_id=track_id,
+ )
+
+ @router.post(
+ "/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
+ )
+ async def upload_to_input_dir(
+ background_tasks: BackgroundTasks, file: UploadFile = File(...)
+ ):
+ """
+ Upload a file to the input directory and index it.
+
+ This API endpoint accepts a file through an HTTP POST request, checks if the
+ uploaded file is of a supported type, saves it in the specified input directory,
+ indexes it for retrieval, and returns a success status with relevant details.
+
+ **File Size Limit:**
+ - Configurable via `MAX_UPLOAD_SIZE` environment variable (default: 100MB)
+ - Set to `None` or `0` for unlimited upload size
+ - Returns HTTP 413 (Request Entity Too Large) if file exceeds limit
+
+ **Duplicate Detection Behavior:**
+
+ This endpoint handles two types of duplicate scenarios differently:
+
+ 1. **Filename Duplicate (Synchronous Detection)**:
+ - Detected immediately before file processing
+ - Returns `status="duplicated"` with the existing document's track_id
+ - Two cases:
+ - If filename exists in document storage: returns existing track_id
+ - If filename exists in file system only: returns empty track_id ("")
+
+ 2. **Content Duplicate (Asynchronous Detection)**:
+ - Detected during background processing after content extraction
+ - Returns `status="success"` with a new track_id immediately
+ - The duplicate is detected later when processing the file content
+ - Use `/documents/track_status/{track_id}` to check the final result:
+ - Document will have `status="FAILED"`
+ - `error_msg` contains "Content already exists. Original doc_id: xxx"
+ - `metadata.is_duplicate=true` with reference to original document
+ - `metadata.original_doc_id` points to the existing document
+ - `metadata.original_track_id` shows the original upload's track_id
+
+ **Why Different Behavior?**
+ - Filename check is fast (simple lookup), done synchronously
+ - Content extraction is expensive (PDF/DOCX parsing), done asynchronously
+ - This design prevents blocking the client during expensive operations
+
+ Args:
+ background_tasks: FastAPI BackgroundTasks for async processing
+ file (UploadFile): The file to be uploaded. It must have an allowed extension.
+
+ Returns:
+ InsertResponse: A response object containing the upload status and a message.
+ - status="success": File accepted and queued for processing
+ - status="duplicated": Filename already exists (see track_id for existing document)
+
+ Raises:
+ HTTPException: If the file type is not supported (400), file too large (413), or other errors occur (500).
+ """
+ try:
+ # Sanitize filename to prevent Path Traversal attacks
+ safe_filename = sanitize_filename(file.filename, doc_manager.input_dir)
+
+ if not doc_manager.is_supported_file(safe_filename):
+ raise HTTPException(
+ status_code=400,
+ detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
+ )
+
+ # Check file size limit (if configured)
+ if (
+ global_args.max_upload_size is not None
+ and global_args.max_upload_size > 0
+ ):
+ # Safe access to file size (not available in older Starlette versions)
+ file_size = getattr(file, "size", None)
+
+ # Pre-flight size check (only if size is available)
+ if file_size is not None:
+ if file_size > global_args.max_upload_size:
+ raise HTTPException(
+ status_code=413,
+ detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {file_size / 1024 / 1024:.1f}MB",
+ )
+ else:
+ # If size not available, we'll check during streaming
+ logger.debug(
+ f"File size not available in UploadFile for {safe_filename}, will check during streaming"
+ )
+
+ # Check if filename already exists in doc_status storage
+ existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
+ if existing_doc_data:
+ # Get document status and track_id from existing document
+ status = existing_doc_data.get("status", "unknown")
+ # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
+ existing_track_id = existing_doc_data.get("track_id") or ""
+ return InsertResponse(
+ status="duplicated",
+ message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
+ track_id=existing_track_id,
+ )
+
+ file_path = doc_manager.input_dir / safe_filename
+ # Check if file already exists in file system
+ if file_path.exists():
+ return InsertResponse(
+ status="duplicated",
+ message=f"File '{safe_filename}' already exists in the input directory.",
+ track_id="",
+ )
+
+ # Async streaming write with size check
+ bytes_written = 0
+ chunk_size = 1024 * 1024 # 1MB chunks
+ needs_cleanup = False
+
+ async with aiofiles.open(file_path, "wb") as out_file:
+ while True:
+ # Read chunk from upload stream
+ chunk = await file.read(chunk_size)
+ if not chunk:
+ break
+
+ # Check size limit during streaming (if not checked before)
+ if (
+ global_args.max_upload_size is not None
+ and global_args.max_upload_size > 0
+ ):
+ bytes_written += len(chunk)
+ if bytes_written > global_args.max_upload_size:
+ needs_cleanup = True
+ break
+
+ # Write chunk to file
+ await out_file.write(chunk)
+
+ # Cleanup after file is closed
+ if needs_cleanup:
+ try:
+ file_path.unlink()
+ except Exception as cleanup_error:
+ logger.error(
+ f"Error cleaning up oversized file {safe_filename}: {cleanup_error}"
+ )
+
+ raise HTTPException(
+ status_code=413,
+ detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {bytes_written / 1024 / 1024:.1f}MB",
+ )
+
+ track_id = generate_track_id("upload")
+
+ # Add to background tasks and get track_id
+ background_tasks.add_task(pipeline_index_file, rag, file_path, track_id)
+
+ return InsertResponse(
+ status="success",
+ message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
+ track_id=track_id,
+ )
+
+ except HTTPException:
+ # Re-raise HTTP exceptions (400, 413, etc.)
+ raise
+ except Exception as e:
+ logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.post(
+ "/text", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
+ )
+ async def insert_text(
+ request: InsertTextRequest, background_tasks: BackgroundTasks
+ ):
+ """
+ Insert text into the RAG system.
+
+ This endpoint allows you to insert text data into the RAG system for later retrieval
+ and use in generating responses.
+
+ Args:
+ request (InsertTextRequest): The request body containing the text to be inserted.
+ background_tasks: FastAPI BackgroundTasks for async processing
+
+ Returns:
+ InsertResponse: A response object containing the status of the operation.
+
+ Raises:
+ HTTPException: If an error occurs during text processing (500).
+ """
+ try:
+ # Check if file_source already exists in doc_status storage
+ if (
+ request.file_source
+ and request.file_source.strip()
+ and request.file_source != "unknown_source"
+ ):
+ existing_doc_data = await rag.doc_status.get_doc_by_file_path(
+ request.file_source
+ )
+ if existing_doc_data:
+ # Get document status and track_id from existing document
+ status = existing_doc_data.get("status", "unknown")
+ # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
+ existing_track_id = existing_doc_data.get("track_id") or ""
+ return InsertResponse(
+ status="duplicated",
+ message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
+ track_id=existing_track_id,
+ )
+
+ # Check if content already exists by computing content hash (doc_id)
+ sanitized_text = sanitize_text_for_encoding(request.text)
+ content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
+ existing_doc = await rag.doc_status.get_by_id(content_doc_id)
+ if existing_doc:
+ # Content already exists, return duplicated with existing track_id
+ status = existing_doc.get("status", "unknown")
+ existing_track_id = existing_doc.get("track_id") or ""
+ return InsertResponse(
+ status="duplicated",
+ message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
+ track_id=existing_track_id,
+ )
+
+ # Generate track_id for text insertion
+ track_id = generate_track_id("insert")
+
+ background_tasks.add_task(
+ pipeline_index_texts,
+ rag,
+ [request.text],
+ file_sources=[request.file_source],
+ track_id=track_id,
+ )
+
+ return InsertResponse(
+ status="success",
+ message="Text successfully received. Processing will continue in background.",
+ track_id=track_id,
+ )
+ except Exception as e:
+ logger.error(f"Error /documents/text: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.post(
+ "/texts",
+ response_model=InsertResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def insert_texts(
+ request: InsertTextsRequest, background_tasks: BackgroundTasks
+ ):
+ """
+ Insert multiple texts into the RAG system.
+
+ This endpoint allows you to insert multiple text entries into the RAG system
+ in a single request.
+
+ Args:
+ request (InsertTextsRequest): The request body containing the list of texts.
+ background_tasks: FastAPI BackgroundTasks for async processing
+
+ Returns:
+ InsertResponse: A response object containing the status of the operation.
+
+ Raises:
+ HTTPException: If an error occurs during text processing (500).
+ """
+ try:
+ # Check if any file_sources already exist in doc_status storage
+ if request.file_sources:
+ for file_source in request.file_sources:
+ if (
+ file_source
+ and file_source.strip()
+ and file_source != "unknown_source"
+ ):
+ existing_doc_data = await rag.doc_status.get_doc_by_file_path(
+ file_source
+ )
+ if existing_doc_data:
+ # Get document status and track_id from existing document
+ status = existing_doc_data.get("status", "unknown")
+ # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
+ existing_track_id = existing_doc_data.get("track_id") or ""
+ return InsertResponse(
+ status="duplicated",
+ message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
+ track_id=existing_track_id,
+ )
+
+ # Check if any content already exists by computing content hash (doc_id)
+ for text in request.texts:
+ sanitized_text = sanitize_text_for_encoding(text)
+ content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
+ existing_doc = await rag.doc_status.get_by_id(content_doc_id)
+ if existing_doc:
+ # Content already exists, return duplicated with existing track_id
+ status = existing_doc.get("status", "unknown")
+ existing_track_id = existing_doc.get("track_id") or ""
+ return InsertResponse(
+ status="duplicated",
+ message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
+ track_id=existing_track_id,
+ )
+
+ # Generate track_id for texts insertion
+ track_id = generate_track_id("insert")
+
+ background_tasks.add_task(
+ pipeline_index_texts,
+ rag,
+ request.texts,
+ file_sources=request.file_sources,
+ track_id=track_id,
+ )
+
+ return InsertResponse(
+ status="success",
+ message="Texts successfully received. Processing will continue in background.",
+ track_id=track_id,
+ )
+ except Exception as e:
+ logger.error(f"Error /documents/texts: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.delete(
+ "", response_model=ClearDocumentsResponse, dependencies=[Depends(combined_auth)]
+ )
+ async def clear_documents():
+ """
+ Clear all documents from the RAG system.
+
+ This endpoint deletes all documents, entities, relationships, and files from the system.
+ It uses the storage drop methods to properly clean up all data and removes all files
+ from the input directory.
+
+ Returns:
+ ClearDocumentsResponse: A response object containing the status and message.
+ - status="success": All documents and files were successfully cleared.
+ - status="partial_success": Document clear job exit with some errors.
+ - status="busy": Operation could not be completed because the pipeline is busy.
+ - status="fail": All storage drop operations failed, with message
+ - message: Detailed information about the operation results, including counts
+ of deleted files and any errors encountered.
+
+ Raises:
+ HTTPException: Raised when a serious error occurs during the clearing process,
+ with status code 500 and error details in the detail field.
+ """
+ from lightrag.kg.shared_storage import (
+ get_namespace_data,
+ get_namespace_lock,
+ )
+
+ # Get pipeline status and lock
+ pipeline_status = await get_namespace_data(
+ "pipeline_status", workspace=rag.workspace
+ )
+ pipeline_status_lock = get_namespace_lock(
+ "pipeline_status", workspace=rag.workspace
+ )
+
+ # Check and set status with lock
+ async with pipeline_status_lock:
+ if pipeline_status.get("busy", False):
+ return ClearDocumentsResponse(
+ status="busy",
+ message="Cannot clear documents while pipeline is busy",
+ )
+ # Set busy to true
+ pipeline_status.update(
+ {
+ "busy": True,
+ "job_name": "Clearing Documents",
+ "job_start": datetime.now().isoformat(),
+ "docs": 0,
+ "batchs": 0,
+ "cur_batch": 0,
+ "request_pending": False, # Clear any previous request
+ "latest_message": "Starting document clearing process",
+ }
+ )
+ # Cleaning history_messages without breaking it as a shared list object
+ del pipeline_status["history_messages"][:]
+ pipeline_status["history_messages"].append(
+ "Starting document clearing process"
+ )
+
+ try:
+ # Use drop method to clear all data
+ drop_tasks = []
+ storages = [
+ rag.text_chunks,
+ rag.full_docs,
+ rag.full_entities,
+ rag.full_relations,
+ rag.entity_chunks,
+ rag.relation_chunks,
+ rag.entities_vdb,
+ rag.relationships_vdb,
+ rag.chunks_vdb,
+ rag.chunk_entity_relation_graph,
+ rag.doc_status,
+ ]
+
+ # Log storage drop start
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(
+ "Starting to drop storage components"
+ )
+
+ for storage in storages:
+ if storage is not None:
+ drop_tasks.append(storage.drop())
+
+ # Wait for all drop tasks to complete
+ drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
+
+ # Check for errors and log results
+ errors = []
+ storage_success_count = 0
+ storage_error_count = 0
+
+ for i, result in enumerate(drop_results):
+ storage_name = storages[i].__class__.__name__
+ if isinstance(result, Exception):
+ error_msg = f"Error dropping {storage_name}: {str(result)}"
+ errors.append(error_msg)
+ logger.error(error_msg)
+ storage_error_count += 1
+ else:
+ namespace = storages[i].namespace
+ workspace = storages[i].workspace
+ logger.info(
+ f"Successfully dropped {storage_name}: {workspace}/{namespace}"
+ )
+ storage_success_count += 1
+
+ # Log storage drop results
+ if "history_messages" in pipeline_status:
+ if storage_error_count > 0:
+ pipeline_status["history_messages"].append(
+ f"Dropped {storage_success_count} storage components with {storage_error_count} errors"
+ )
+ else:
+ pipeline_status["history_messages"].append(
+ f"Successfully dropped all {storage_success_count} storage components"
+ )
+
+ # If all storage operations failed, return error status and don't proceed with file deletion
+ if storage_success_count == 0 and storage_error_count > 0:
+ error_message = "All storage drop operations failed. Aborting document clearing process."
+ logger.error(error_message)
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(error_message)
+ return ClearDocumentsResponse(status="fail", message=error_message)
+
+ # Log file deletion start
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(
+ "Starting to delete files in input directory"
+ )
+
+ # Delete only files in the current directory, preserve files in subdirectories
+ deleted_files_count = 0
+ file_errors_count = 0
+
+ for file_path in doc_manager.input_dir.glob("*"):
+ if file_path.is_file():
+ try:
+ file_path.unlink()
+ deleted_files_count += 1
+ except Exception as e:
+ logger.error(f"Error deleting file {file_path}: {str(e)}")
+ file_errors_count += 1
+
+ # Log file deletion results
+ if "history_messages" in pipeline_status:
+ if file_errors_count > 0:
+ pipeline_status["history_messages"].append(
+ f"Deleted {deleted_files_count} files with {file_errors_count} errors"
+ )
+ errors.append(f"Failed to delete {file_errors_count} files")
+ else:
+ pipeline_status["history_messages"].append(
+ f"Successfully deleted {deleted_files_count} files"
+ )
+
+ # Prepare final result message
+ final_message = ""
+ if errors:
+ final_message = f"Cleared documents with some errors. Deleted {deleted_files_count} files."
+ status = "partial_success"
+ else:
+ final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
+ status = "success"
+
+ # Log final result
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(final_message)
+
+ # Return response based on results
+ return ClearDocumentsResponse(status=status, message=final_message)
+ except Exception as e:
+ error_msg = f"Error clearing documents: {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(error_msg)
+ raise HTTPException(status_code=500, detail=str(e))
+ finally:
+ # Reset busy status after completion
+ async with pipeline_status_lock:
+ pipeline_status["busy"] = False
+ completion_msg = "Document clearing process completed"
+ pipeline_status["latest_message"] = completion_msg
+ if "history_messages" in pipeline_status:
+ pipeline_status["history_messages"].append(completion_msg)
+
+ @router.get(
+ "/pipeline_status",
+ dependencies=[Depends(combined_auth)],
+ response_model=PipelineStatusResponse,
+ )
+ async def get_pipeline_status() -> PipelineStatusResponse:
+ """
+ Get the current status of the document indexing pipeline.
+
+ This endpoint returns information about the current state of the document processing pipeline,
+ including the processing status, progress information, and history messages.
+
+ Returns:
+ PipelineStatusResponse: A response object containing:
+ - autoscanned (bool): Whether auto-scan has started
+ - busy (bool): Whether the pipeline is currently busy
+ - job_name (str): Current job name (e.g., indexing files/indexing texts)
+ - job_start (str, optional): Job start time as ISO format string
+ - docs (int): Total number of documents to be indexed
+ - batchs (int): Number of batches for processing documents
+ - cur_batch (int): Current processing batch
+ - request_pending (bool): Flag for pending request for processing
+ - latest_message (str): Latest message from pipeline processing
+ - history_messages (List[str], optional): List of history messages (limited to latest 1000 entries,
+ with truncation message if more than 1000 messages exist)
+
+ Raises:
+ HTTPException: If an error occurs while retrieving pipeline status (500)
+ """
+ try:
+ from lightrag.kg.shared_storage import (
+ get_namespace_data,
+ get_namespace_lock,
+ get_all_update_flags_status,
+ )
+
+ pipeline_status = await get_namespace_data(
+ "pipeline_status", workspace=rag.workspace
+ )
+ pipeline_status_lock = get_namespace_lock(
+ "pipeline_status", workspace=rag.workspace
+ )
+
+ # Get update flags status for all namespaces
+ update_status = await get_all_update_flags_status(workspace=rag.workspace)
+
+ # Convert MutableBoolean objects to regular boolean values
+ processed_update_status = {}
+ for namespace, flags in update_status.items():
+ processed_flags = []
+ for flag in flags:
+ # Handle both multiprocess and single process cases
+ if hasattr(flag, "value"):
+ processed_flags.append(bool(flag.value))
+ else:
+ processed_flags.append(bool(flag))
+ processed_update_status[namespace] = processed_flags
+
+ async with pipeline_status_lock:
+ # Convert to regular dict if it's a Manager.dict
+ status_dict = dict(pipeline_status)
+
+ # Add processed update_status to the status dictionary
+ status_dict["update_status"] = processed_update_status
+
+ # Convert history_messages to a regular list if it's a Manager.list
+ # and limit to latest 1000 entries with truncation message if needed
+ if "history_messages" in status_dict:
+ history_list = list(status_dict["history_messages"])
+ total_count = len(history_list)
+
+ if total_count > 1000:
+ # Calculate truncated message count
+ truncated_count = total_count - 1000
+
+ # Take only the latest 1000 messages
+ latest_messages = history_list[-1000:]
+
+ # Add truncation message at the beginning
+ truncation_message = (
+ f"[Truncated history messages: {truncated_count}/{total_count}]"
+ )
+ status_dict["history_messages"] = [
+ truncation_message
+ ] + latest_messages
+ else:
+ # No truncation needed, return all messages
+ status_dict["history_messages"] = history_list
+
+ # Ensure job_start is properly formatted as a string with timezone information
+ if "job_start" in status_dict and status_dict["job_start"]:
+ # Use format_datetime to ensure consistent formatting
+ status_dict["job_start"] = format_datetime(status_dict["job_start"])
+
+ return PipelineStatusResponse(**status_dict)
+ except Exception as e:
+ logger.error(f"Error getting pipeline status: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ # TODO: Deprecated, use /documents/paginated instead
+ @router.get(
+ "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
+ )
+ async def documents() -> DocsStatusesResponse:
+ """
+ Get the status of all documents in the system. This endpoint is deprecated; use /documents/paginated instead.
+ To prevent excessive resource consumption, a maximum of 1,000 records is returned.
+
+ This endpoint retrieves the current status of all documents, grouped by their
+ processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are
+ limited to 1000 total documents with fair distribution across all statuses.
+
+ Returns:
+ DocsStatusesResponse: A response object containing a dictionary where keys are
+ DocStatus values and values are lists of DocStatusResponse
+ objects representing documents in each status category.
+ Maximum 1000 documents total will be returned.
+
+ Raises:
+ HTTPException: If an error occurs while retrieving document statuses (500).
+ """
+ try:
+ statuses = (
+ DocStatus.PENDING,
+ DocStatus.PROCESSING,
+ DocStatus.PREPROCESSED,
+ DocStatus.PROCESSED,
+ DocStatus.FAILED,
+ )
+
+ tasks = [rag.get_docs_by_status(status) for status in statuses]
+ results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks)
+
+ response = DocsStatusesResponse()
+ total_documents = 0
+ max_documents = 1000
+
+ # Convert results to lists for easier processing
+ status_documents = []
+ for idx, result in enumerate(results):
+ status = statuses[idx]
+ docs_list = []
+ for doc_id, doc_status in result.items():
+ docs_list.append((doc_id, doc_status))
+ status_documents.append((status, docs_list))
+
+ # Fair distribution: round-robin across statuses
+ status_indices = [0] * len(
+ status_documents
+ ) # Track current index for each status
+ current_status_idx = 0
+
+ while total_documents < max_documents:
+ # Check if we have any documents left to process
+ has_remaining = False
+ for status_idx, (status, docs_list) in enumerate(status_documents):
+ if status_indices[status_idx] < len(docs_list):
+ has_remaining = True
+ break
+
+ if not has_remaining:
+ break
+
+ # Try to get a document from the current status
+ status, docs_list = status_documents[current_status_idx]
+ current_index = status_indices[current_status_idx]
+
+ if current_index < len(docs_list):
+ doc_id, doc_status = docs_list[current_index]
+
+ if status not in response.statuses:
+ response.statuses[status] = []
+
+ response.statuses[status].append(
+ DocStatusResponse(
+ id=doc_id,
+ content_summary=doc_status.content_summary,
+ content_length=doc_status.content_length,
+ status=doc_status.status,
+ created_at=format_datetime(doc_status.created_at),
+ updated_at=format_datetime(doc_status.updated_at),
+ track_id=doc_status.track_id,
+ chunks_count=doc_status.chunks_count,
+ error_msg=doc_status.error_msg,
+ metadata=doc_status.metadata,
+ file_path=normalize_file_path(doc_status.file_path),
+ )
+ )
+
+ status_indices[current_status_idx] += 1
+ total_documents += 1
+
+ # Move to next status (round-robin)
+ current_status_idx = (current_status_idx + 1) % len(status_documents)
+
+ return response
+ except Exception as e:
+ logger.error(f"Error GET /documents: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ class DeleteDocByIdResponse(BaseModel):
+ """Response model for single document deletion operation."""
+
+ status: Literal["deletion_started", "busy", "not_allowed"] = Field(
+ description="Status of the deletion operation"
+ )
+ message: str = Field(description="Message describing the operation result")
+ doc_id: str = Field(description="The ID of the document to delete")
+
+ @router.delete(
+ "/delete_document",
+ response_model=DeleteDocByIdResponse,
+ dependencies=[Depends(combined_auth)],
+ summary="Delete a document and all its associated data by its ID.",
+ )
+ async def delete_document(
+ delete_request: DeleteDocRequest,
+ background_tasks: BackgroundTasks,
+ ) -> DeleteDocByIdResponse:
+ """
+ Delete documents and all their associated data by their IDs using background processing.
+
+ Deletes specific documents and all their associated data, including their status,
+ text chunks, vector embeddings, and any related graph data. When requested,
+ cached LLM extraction responses are removed after graph deletion/rebuild completes.
+ The deletion process runs in the background to avoid blocking the client connection.
+
+ This operation is irreversible and will interact with the pipeline status.
+
+ Args:
+ delete_request (DeleteDocRequest): The request containing the document IDs and deletion options.
+ background_tasks: FastAPI BackgroundTasks for async processing
+
+ Returns:
+ DeleteDocByIdResponse: The result of the deletion operation.
+ - status="deletion_started": The document deletion has been initiated in the background.
+ - status="busy": The pipeline is busy with another operation.
+
+ Raises:
+ HTTPException:
+ - 500: If an unexpected internal error occurs during initialization.
+ """
+ doc_ids = delete_request.doc_ids
+
+ try:
+ from lightrag.kg.shared_storage import (
+ get_namespace_data,
+ get_namespace_lock,
+ )
+
+ pipeline_status = await get_namespace_data(
+ "pipeline_status", workspace=rag.workspace
+ )
+ pipeline_status_lock = get_namespace_lock(
+ "pipeline_status", workspace=rag.workspace
+ )
+
+ # Check if pipeline is busy with proper lock
+ async with pipeline_status_lock:
+ if pipeline_status.get("busy", False):
+ return DeleteDocByIdResponse(
+ status="busy",
+ message="Cannot delete documents while pipeline is busy",
+ doc_id=", ".join(doc_ids),
+ )
+
+ # Add deletion task to background tasks
+ background_tasks.add_task(
+ background_delete_documents,
+ rag,
+ doc_manager,
+ doc_ids,
+ delete_request.delete_file,
+ delete_request.delete_llm_cache,
+ )
+
+ return DeleteDocByIdResponse(
+ status="deletion_started",
+ message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.",
+ doc_id=", ".join(doc_ids),
+ )
+
+ except Exception as e:
+ error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=error_msg)
+
+ @router.post(
+ "/clear_cache",
+ response_model=ClearCacheResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def clear_cache(request: ClearCacheRequest):
+ """
+ Clear all cache data from the LLM response cache storage.
+
+ This endpoint clears all cached LLM responses regardless of mode.
+ The request body is accepted for API compatibility but is ignored.
+
+ Args:
+ request (ClearCacheRequest): The request body (ignored for compatibility).
+
+ Returns:
+ ClearCacheResponse: A response object containing the status and message.
+
+ Raises:
+ HTTPException: If an error occurs during cache clearing (500).
+ """
+ try:
+ # Call the aclear_cache method (no modes parameter)
+ await rag.aclear_cache()
+
+ # Prepare success message
+ message = "Successfully cleared all cache"
+
+ return ClearCacheResponse(status="success", message=message)
+ except Exception as e:
+ logger.error(f"Error clearing cache: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.delete(
+ "/delete_entity",
+ response_model=DeletionResult,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def delete_entity(request: DeleteEntityRequest):
+ """
+ Delete an entity and all its relationships from the knowledge graph.
+
+ Args:
+ request (DeleteEntityRequest): The request body containing the entity name.
+
+ Returns:
+ DeletionResult: An object containing the outcome of the deletion process.
+
+ Raises:
+ HTTPException: If the entity is not found (404) or an error occurs (500).
+ """
+ try:
+ result = await rag.adelete_by_entity(entity_name=request.entity_name)
+ if result.status == "not_found":
+ raise HTTPException(status_code=404, detail=result.message)
+ if result.status == "fail":
+ raise HTTPException(status_code=500, detail=result.message)
+ # Set doc_id to empty string since this is an entity operation, not document
+ result.doc_id = ""
+ return result
+ except HTTPException:
+ raise
+ except Exception as e:
+ error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=error_msg)
+
+ @router.delete(
+ "/delete_relation",
+ response_model=DeletionResult,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def delete_relation(request: DeleteRelationRequest):
+ """
+ Delete a relationship between two entities from the knowledge graph.
+
+ Args:
+ request (DeleteRelationRequest): The request body containing the source and target entity names.
+
+ Returns:
+ DeletionResult: An object containing the outcome of the deletion process.
+
+ Raises:
+ HTTPException: If the relation is not found (404) or an error occurs (500).
+ """
+ try:
+ result = await rag.adelete_by_relation(
+ source_entity=request.source_entity,
+ target_entity=request.target_entity,
+ )
+ if result.status == "not_found":
+ raise HTTPException(status_code=404, detail=result.message)
+ if result.status == "fail":
+ raise HTTPException(status_code=500, detail=result.message)
+ # Set doc_id to empty string since this is a relation operation, not document
+ result.doc_id = ""
+ return result
+ except HTTPException:
+ raise
+ except Exception as e:
+ error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}"
+ logger.error(error_msg)
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=error_msg)
+
+ @router.get(
+ "/track_status/{track_id}",
+ response_model=TrackStatusResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def get_track_status(track_id: str) -> TrackStatusResponse:
+ """
+ Get the processing status of documents by tracking ID.
+
+ This endpoint retrieves all documents associated with a specific tracking ID,
+ allowing users to monitor the processing progress of their uploaded files or inserted texts.
+
+ Args:
+ track_id (str): The tracking ID returned from upload, text, or texts endpoints
+
+ Returns:
+ TrackStatusResponse: A response object containing:
+ - track_id: The tracking ID
+ - documents: List of documents associated with this track_id
+ - total_count: Total number of documents for this track_id
+
+ Raises:
+ HTTPException: If track_id is invalid (400) or an error occurs (500).
+ """
+ try:
+ # Validate track_id
+ if not track_id or not track_id.strip():
+ raise HTTPException(status_code=400, detail="Track ID cannot be empty")
+
+ track_id = track_id.strip()
+
+ # Get documents by track_id
+ docs_by_track_id = await rag.aget_docs_by_track_id(track_id)
+
+ # Convert to response format
+ documents = []
+ status_summary = {}
+
+ for doc_id, doc_status in docs_by_track_id.items():
+ documents.append(
+ DocStatusResponse(
+ id=doc_id,
+ content_summary=doc_status.content_summary,
+ content_length=doc_status.content_length,
+ status=doc_status.status,
+ created_at=format_datetime(doc_status.created_at),
+ updated_at=format_datetime(doc_status.updated_at),
+ track_id=doc_status.track_id,
+ chunks_count=doc_status.chunks_count,
+ error_msg=doc_status.error_msg,
+ metadata=doc_status.metadata,
+ file_path=normalize_file_path(doc_status.file_path),
+ )
+ )
+
+ # Build status summary
+ # Handle both DocStatus enum and string cases for robust deserialization
+ status_key = str(doc_status.status)
+ status_summary[status_key] = status_summary.get(status_key, 0) + 1
+
+ return TrackStatusResponse(
+ track_id=track_id,
+ documents=documents,
+ total_count=len(documents),
+ status_summary=status_summary,
+ )
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting track status for {track_id}: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.post(
+ "/paginated",
+ response_model=PaginatedDocsResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def get_documents_paginated(
+ request: DocumentsRequest,
+ ) -> PaginatedDocsResponse:
+ """
+ Get documents with pagination support.
+
+ This endpoint retrieves documents with pagination, filtering, and sorting capabilities.
+ It provides better performance for large document collections by loading only the
+ requested page of data.
+
+ Args:
+ request (DocumentsRequest): The request body containing pagination parameters
+
+ Returns:
+ PaginatedDocsResponse: A response object containing:
+ - documents: List of documents for the current page
+ - pagination: Pagination information (page, total_count, etc.)
+ - status_counts: Count of documents by status for all documents
+
+ Raises:
+ HTTPException: If an error occurs while retrieving documents (500).
+ """
+ trace_id = uuid4().hex[:8]
+ request_start = time.perf_counter()
+ status_filter_value = (
+ request.status_filter.value if request.status_filter is not None else None
+ )
+
+ performance_timing_log(
+ "[documents/paginated][%s] Request start workspace=%s status_filter=%s page=%s page_size=%s sort_field=%s sort_direction=%s",
+ trace_id,
+ rag.workspace,
+ status_filter_value,
+ request.page,
+ request.page_size,
+ request.sort_field,
+ request.sort_direction,
+ )
+
+ try:
+
+ async def _timed_call(operation_name: str, operation):
+ operation_start = time.perf_counter()
+ performance_timing_log(
+ "[documents/paginated][%s] %s started",
+ trace_id,
+ operation_name,
+ )
+ try:
+ result = await operation
+ except Exception:
+ elapsed = time.perf_counter() - operation_start
+ performance_timing_log(
+ "[documents/paginated][%s] %s failed after %.4fs",
+ trace_id,
+ operation_name,
+ elapsed,
+ )
+ raise
+
+ elapsed = time.perf_counter() - operation_start
+ performance_timing_log(
+ "[documents/paginated][%s] %s completed in %.4fs",
+ trace_id,
+ operation_name,
+ elapsed,
+ )
+ return result
+
+ query_task_create_start = time.perf_counter()
+ docs_task = asyncio.create_task(
+ _timed_call(
+ "get_docs_paginated",
+ rag.doc_status.get_docs_paginated(
+ status_filter=request.status_filter,
+ page=request.page,
+ page_size=request.page_size,
+ sort_field=request.sort_field,
+ sort_direction=request.sort_direction,
+ ),
+ )
+ )
+ status_counts_task = asyncio.create_task(
+ _timed_call(
+ "get_all_status_counts",
+ rag.doc_status.get_all_status_counts(),
+ )
+ )
+ query_task_create_elapsed = time.perf_counter() - query_task_create_start
+ performance_timing_log(
+ "[documents/paginated][%s] Query tasks created in %.4fs",
+ trace_id,
+ query_task_create_elapsed,
+ )
+
+ query_await_start = time.perf_counter()
+ (documents_with_ids, total_count), status_counts = await asyncio.gather(
+ docs_task, status_counts_task
+ )
+ query_await_elapsed = time.perf_counter() - query_await_start
+ performance_timing_log(
+ "[documents/paginated][%s] Query tasks awaited in %.4fs",
+ trace_id,
+ query_await_elapsed,
+ )
+
+ # Convert documents to response format
+ response_assembly_start = time.perf_counter()
+ doc_responses = []
+ for doc_id, doc in documents_with_ids:
+ doc_responses.append(
+ DocStatusResponse(
+ id=doc_id,
+ content_summary=doc.content_summary,
+ content_length=doc.content_length,
+ status=doc.status,
+ created_at=format_datetime(doc.created_at),
+ updated_at=format_datetime(doc.updated_at),
+ track_id=doc.track_id,
+ chunks_count=doc.chunks_count,
+ error_msg=doc.error_msg,
+ metadata=doc.metadata,
+ file_path=normalize_file_path(doc.file_path),
+ )
+ )
+
+ # Calculate pagination info
+ total_pages = (total_count + request.page_size - 1) // request.page_size
+ has_next = request.page < total_pages
+ has_prev = request.page > 1
+
+ pagination = PaginationInfo(
+ page=request.page,
+ page_size=request.page_size,
+ total_count=total_count,
+ total_pages=total_pages,
+ has_next=has_next,
+ has_prev=has_prev,
+ )
+ response = PaginatedDocsResponse(
+ documents=doc_responses,
+ pagination=pagination,
+ status_counts=status_counts,
+ )
+ response_assembly_elapsed = time.perf_counter() - response_assembly_start
+ total_elapsed = time.perf_counter() - request_start
+
+ performance_timing_log(
+ "[documents/paginated][%s] Response assembled in %.4fs",
+ trace_id,
+ response_assembly_elapsed,
+ )
+ performance_timing_log(
+ "[documents/paginated][%s] Request completed in %.4fs returned_rows=%s total_count=%s status_count_keys=%s",
+ trace_id,
+ total_elapsed,
+ len(doc_responses),
+ total_count,
+ sorted(status_counts.keys()),
+ )
+
+ return response
+
+ except Exception as e:
+ total_elapsed = time.perf_counter() - request_start
+ performance_timing_log(
+ "[documents/paginated][%s] Request failed after %.4fs",
+ trace_id,
+ total_elapsed,
+ )
+ logger.error(f"Error getting paginated documents: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.get(
+ "/status_counts",
+ response_model=StatusCountsResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def get_document_status_counts() -> StatusCountsResponse:
+ """
+ Get counts of documents by status.
+
+ This endpoint retrieves the count of documents in each processing status
+ (PENDING, PROCESSING, PROCESSED, FAILED) for all documents in the system.
+
+ Returns:
+ StatusCountsResponse: A response object containing status counts
+
+ Raises:
+ HTTPException: If an error occurs while retrieving status counts (500).
+ """
+ try:
+ status_counts = await rag.doc_status.get_all_status_counts()
+ return StatusCountsResponse(status_counts=status_counts)
+
+ except Exception as e:
+ logger.error(f"Error getting document status counts: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.post(
+ "/reprocess_failed",
+ response_model=ReprocessResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def reprocess_failed_documents(background_tasks: BackgroundTasks):
+ """
+ Reprocess failed and pending documents.
+
+ This endpoint triggers the document processing pipeline which automatically
+ picks up and reprocesses documents in the following statuses:
+ - FAILED: Documents that failed during previous processing attempts
+ - PENDING: Documents waiting to be processed
+ - PROCESSING: Documents with abnormally terminated processing (e.g., server crashes)
+
+ This is useful for recovering from server crashes, network errors, LLM service
+ outages, or other temporary failures that caused document processing to fail.
+
+ The processing happens in the background and can be monitored by checking the
+ pipeline status. The reprocessed documents retain their original track_id from
+ initial upload, so use their original track_id to monitor progress.
+
+ Returns:
+ ReprocessResponse: Response with status and message.
+ track_id is always empty string because reprocessed documents retain
+ their original track_id from initial upload.
+
+ Raises:
+ HTTPException: If an error occurs while initiating reprocessing (500).
+ """
+ try:
+ # Start the reprocessing in the background
+ # Note: Reprocessed documents retain their original track_id from initial upload
+ background_tasks.add_task(rag.apipeline_process_enqueue_documents)
+ logger.info("Reprocessing of failed documents initiated")
+
+ return ReprocessResponse(
+ status="reprocessing_started",
+ message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
+ )
+
+ except Exception as e:
+ logger.error(f"Error initiating reprocessing of failed documents: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @router.post(
+ "/cancel_pipeline",
+ response_model=CancelPipelineResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def cancel_pipeline():
+ """
+ Request cancellation of the currently running pipeline.
+
+ This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
+ 1. Check this flag at key processing points
+ 2. Stop processing new documents
+ 3. Cancel all running document processing tasks
+ 4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
+
+ The cancellation is graceful and ensures data consistency. Documents that have
+ completed processing will remain in PROCESSED status.
+
+ Returns:
+ CancelPipelineResponse: Response with status and message
+ - status="cancellation_requested": Cancellation flag has been set
+ - status="not_busy": Pipeline is not currently running
+
+ Raises:
+ HTTPException: If an error occurs while setting cancellation flag (500).
+ """
+ try:
+ from lightrag.kg.shared_storage import (
+ get_namespace_data,
+ get_namespace_lock,
+ )
+
+ pipeline_status = await get_namespace_data(
+ "pipeline_status", workspace=rag.workspace
+ )
+ pipeline_status_lock = get_namespace_lock(
+ "pipeline_status", workspace=rag.workspace
+ )
+
+ async with pipeline_status_lock:
+ if not pipeline_status.get("busy", False):
+ return CancelPipelineResponse(
+ status="not_busy",
+ message="Pipeline is not currently running. No cancellation needed.",
+ )
+
+ # Set cancellation flag
+ pipeline_status["cancellation_requested"] = True
+ cancel_msg = "Pipeline cancellation requested by user"
+ logger.info(cancel_msg)
+ pipeline_status["latest_message"] = cancel_msg
+ pipeline_status["history_messages"].append(cancel_msg)
+
+ return CancelPipelineResponse(
+ status="cancellation_requested",
+ message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
+ )
+
+ except Exception as e:
+ logger.error(f"Error requesting pipeline cancellation: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(status_code=500, detail=str(e))
+
+ return router
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/routers/graph_routes.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/routers/graph_routes.py
new file mode 100644
index 0000000..e892ff0
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/api/routers/graph_routes.py
@@ -0,0 +1,688 @@
+"""
+This module contains all graph-related routes for the LightRAG API.
+"""
+
+from typing import Optional, Dict, Any
+import traceback
+from fastapi import APIRouter, Depends, Query, HTTPException
+from pydantic import BaseModel, Field
+
+from lightrag.utils import logger
+from ..utils_api import get_combined_auth_dependency
+
+router = APIRouter(tags=["graph"])
+
+
+class EntityUpdateRequest(BaseModel):
+ entity_name: str
+ updated_data: Dict[str, Any]
+ allow_rename: bool = False
+ allow_merge: bool = False
+
+
+class RelationUpdateRequest(BaseModel):
+ source_id: str
+ target_id: str
+ updated_data: Dict[str, Any]
+
+
+class EntityMergeRequest(BaseModel):
+ entities_to_change: list[str] = Field(
+ ...,
+ description="List of entity names to be merged and deleted. These are typically duplicate or misspelled entities.",
+ min_length=1,
+ examples=[["Elon Msk", "Ellon Musk"]],
+ )
+ entity_to_change_into: str = Field(
+ ...,
+ description="Target entity name that will receive all relationships from the source entities. This entity will be preserved.",
+ min_length=1,
+ examples=["Elon Musk"],
+ )
+
+
+class EntityCreateRequest(BaseModel):
+ entity_name: str = Field(
+ ...,
+ description="Unique name for the new entity",
+ min_length=1,
+ examples=["Tesla"],
+ )
+ entity_data: Dict[str, Any] = Field(
+ ...,
+ description="Dictionary containing entity properties. Common fields include 'description' and 'entity_type'.",
+ examples=[
+ {
+ "description": "Electric vehicle manufacturer",
+ "entity_type": "ORGANIZATION",
+ }
+ ],
+ )
+
+
+class RelationCreateRequest(BaseModel):
+ source_entity: str = Field(
+ ...,
+ description="Name of the source entity. This entity must already exist in the knowledge graph.",
+ min_length=1,
+ examples=["Elon Musk"],
+ )
+ target_entity: str = Field(
+ ...,
+ description="Name of the target entity. This entity must already exist in the knowledge graph.",
+ min_length=1,
+ examples=["Tesla"],
+ )
+ relation_data: Dict[str, Any] = Field(
+ ...,
+ description="Dictionary containing relationship properties. Common fields include 'description', 'keywords', and 'weight'.",
+ examples=[
+ {
+ "description": "Elon Musk is the CEO of Tesla",
+ "keywords": "CEO, founder",
+ "weight": 1.0,
+ }
+ ],
+ )
+
+
+def create_graph_routes(rag, api_key: Optional[str] = None):
+ combined_auth = get_combined_auth_dependency(api_key)
+
+ @router.get("/graph/label/list", dependencies=[Depends(combined_auth)])
+ async def get_graph_labels():
+ """
+ Get all graph labels
+
+ Returns:
+ List[str]: List of graph labels
+ """
+ try:
+ return await rag.get_graph_labels()
+ except Exception as e:
+ logger.error(f"Error getting graph labels: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error getting graph labels: {str(e)}"
+ )
+
+ @router.get("/graph/label/popular", dependencies=[Depends(combined_auth)])
+ async def get_popular_labels(
+ limit: int = Query(
+ 300, description="Maximum number of popular labels to return", ge=1, le=1000
+ ),
+ ):
+ """
+ Get popular labels by node degree (most connected entities)
+
+ Args:
+ limit (int): Maximum number of labels to return (default: 300, max: 1000)
+
+ Returns:
+ List[str]: List of popular labels sorted by degree (highest first)
+ """
+ try:
+ return await rag.chunk_entity_relation_graph.get_popular_labels(limit)
+ except Exception as e:
+ logger.error(f"Error getting popular labels: {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error getting popular labels: {str(e)}"
+ )
+
+ @router.get("/graph/label/search", dependencies=[Depends(combined_auth)])
+ async def search_labels(
+ q: str = Query(..., description="Search query string"),
+ limit: int = Query(
+ 50, description="Maximum number of search results to return", ge=1, le=100
+ ),
+ ):
+ """
+ Search labels with fuzzy matching
+
+ Args:
+ q (str): Search query string
+ limit (int): Maximum number of results to return (default: 50, max: 100)
+
+ Returns:
+ List[str]: List of matching labels sorted by relevance
+ """
+ try:
+ return await rag.chunk_entity_relation_graph.search_labels(q, limit)
+ except Exception as e:
+ logger.error(f"Error searching labels with query '{q}': {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error searching labels: {str(e)}"
+ )
+
+ @router.get("/graphs", dependencies=[Depends(combined_auth)])
+ async def get_knowledge_graph(
+ label: str = Query(..., description="Label to get knowledge graph for"),
+ max_depth: int = Query(3, description="Maximum depth of graph", ge=1),
+ max_nodes: int = Query(1000, description="Maximum nodes to return", ge=1),
+ ):
+ """
+ Retrieve a connected subgraph of nodes where the label includes the specified label.
+ When reducing the number of nodes, the prioritization criteria are as follows:
+ 1. Hops(path) to the staring node take precedence
+ 2. Followed by the degree of the nodes
+
+ Args:
+ label (str): Label of the starting node
+ max_depth (int, optional): Maximum depth of the subgraph,Defaults to 3
+ max_nodes: Maxiumu nodes to return
+
+ Returns:
+ Dict[str, List[str]]: Knowledge graph for label
+ """
+ try:
+ # Log the label parameter to check for leading spaces
+ logger.debug(
+ f"get_knowledge_graph called with label: '{label}' (length: {len(label)}, repr: {repr(label)})"
+ )
+
+ return await rag.get_knowledge_graph(
+ node_label=label,
+ max_depth=max_depth,
+ max_nodes=max_nodes,
+ )
+ except Exception as e:
+ logger.error(f"Error getting knowledge graph for label '{label}': {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error getting knowledge graph: {str(e)}"
+ )
+
+ @router.get("/graph/entity/exists", dependencies=[Depends(combined_auth)])
+ async def check_entity_exists(
+ name: str = Query(..., description="Entity name to check"),
+ ):
+ """
+ Check if an entity with the given name exists in the knowledge graph
+
+ Args:
+ name (str): Name of the entity to check
+
+ Returns:
+ Dict[str, bool]: Dictionary with 'exists' key indicating if entity exists
+ """
+ try:
+ exists = await rag.chunk_entity_relation_graph.has_node(name)
+ return {"exists": exists}
+ except Exception as e:
+ logger.error(f"Error checking entity existence for '{name}': {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error checking entity existence: {str(e)}"
+ )
+
+ @router.post("/graph/entity/edit", dependencies=[Depends(combined_auth)])
+ async def update_entity(request: EntityUpdateRequest):
+ """
+ Update an entity's properties in the knowledge graph
+
+ This endpoint allows updating entity properties, including renaming entities.
+ When renaming to an existing entity name, the behavior depends on allow_merge:
+
+ Args:
+ request (EntityUpdateRequest): Request containing:
+ - entity_name (str): Name of the entity to update
+ - updated_data (Dict[str, Any]): Dictionary of properties to update
+ - allow_rename (bool): Whether to allow entity renaming (default: False)
+ - allow_merge (bool): Whether to merge into existing entity when renaming
+ causes name conflict (default: False)
+
+ Returns:
+ Dict with the following structure:
+ {
+ "status": "success",
+ "message": "Entity updated successfully" | "Entity merged successfully into 'target_name'",
+ "data": {
+ "entity_name": str, # Final entity name
+ "description": str, # Entity description
+ "entity_type": str, # Entity type
+ "source_id": str, # Source chunk IDs
+ ... # Other entity properties
+ },
+ "operation_summary": {
+ "merged": bool, # Whether entity was merged into another
+ "merge_status": str, # "success" | "failed" | "not_attempted"
+ "merge_error": str | None, # Error message if merge failed
+ "operation_status": str, # "success" | "partial_success" | "failure"
+ "target_entity": str | None, # Target entity name if renaming/merging
+ "final_entity": str, # Final entity name after operation
+ "renamed": bool # Whether entity was renamed
+ }
+ }
+
+ operation_status values explained:
+ - "success": All operations completed successfully
+ * For simple updates: entity properties updated
+ * For renames: entity renamed successfully
+ * For merges: non-name updates applied AND merge completed
+
+ - "partial_success": Update succeeded but merge failed
+ * Non-name property updates were applied successfully
+ * Merge operation failed (entity not merged)
+ * Original entity still exists with updated properties
+ * Use merge_error for failure details
+
+ - "failure": Operation failed completely
+ * If merge_status == "failed": Merge attempted but both update and merge failed
+ * If merge_status == "not_attempted": Regular update failed
+ * No changes were applied to the entity
+
+ merge_status values explained:
+ - "success": Entity successfully merged into target entity
+ - "failed": Merge operation was attempted but failed
+ - "not_attempted": No merge was attempted (normal update/rename)
+
+ Behavior when renaming to an existing entity:
+ - If allow_merge=False: Raises ValueError with 400 status (default behavior)
+ - If allow_merge=True: Automatically merges the source entity into the existing target entity,
+ preserving all relationships and applying non-name updates first
+
+ Example Request (simple update):
+ POST /graph/entity/edit
+ {
+ "entity_name": "Tesla",
+ "updated_data": {"description": "Updated description"},
+ "allow_rename": false,
+ "allow_merge": false
+ }
+
+ Example Response (simple update success):
+ {
+ "status": "success",
+ "message": "Entity updated successfully",
+ "data": { ... },
+ "operation_summary": {
+ "merged": false,
+ "merge_status": "not_attempted",
+ "merge_error": null,
+ "operation_status": "success",
+ "target_entity": null,
+ "final_entity": "Tesla",
+ "renamed": false
+ }
+ }
+
+ Example Request (rename with auto-merge):
+ POST /graph/entity/edit
+ {
+ "entity_name": "Elon Msk",
+ "updated_data": {
+ "entity_name": "Elon Musk",
+ "description": "Corrected description"
+ },
+ "allow_rename": true,
+ "allow_merge": true
+ }
+
+ Example Response (merge success):
+ {
+ "status": "success",
+ "message": "Entity merged successfully into 'Elon Musk'",
+ "data": { ... },
+ "operation_summary": {
+ "merged": true,
+ "merge_status": "success",
+ "merge_error": null,
+ "operation_status": "success",
+ "target_entity": "Elon Musk",
+ "final_entity": "Elon Musk",
+ "renamed": true
+ }
+ }
+
+ Example Response (partial success - update succeeded but merge failed):
+ {
+ "status": "success",
+ "message": "Entity updated successfully",
+ "data": { ... }, # Data reflects updated "Elon Msk" entity
+ "operation_summary": {
+ "merged": false,
+ "merge_status": "failed",
+ "merge_error": "Target entity locked by another operation",
+ "operation_status": "partial_success",
+ "target_entity": "Elon Musk",
+ "final_entity": "Elon Msk", # Original entity still exists
+ "renamed": true
+ }
+ }
+ """
+ try:
+ result = await rag.aedit_entity(
+ entity_name=request.entity_name,
+ updated_data=request.updated_data,
+ allow_rename=request.allow_rename,
+ allow_merge=request.allow_merge,
+ )
+
+ # Extract operation_summary from result, with fallback for backward compatibility
+ operation_summary = result.get(
+ "operation_summary",
+ {
+ "merged": False,
+ "merge_status": "not_attempted",
+ "merge_error": None,
+ "operation_status": "success",
+ "target_entity": None,
+ "final_entity": request.updated_data.get(
+ "entity_name", request.entity_name
+ ),
+ "renamed": request.updated_data.get(
+ "entity_name", request.entity_name
+ )
+ != request.entity_name,
+ },
+ )
+
+ # Separate entity data from operation_summary for clean response
+ entity_data = dict(result)
+ entity_data.pop("operation_summary", None)
+
+ # Generate appropriate response message based on merge status
+ response_message = (
+ f"Entity merged successfully into '{operation_summary['final_entity']}'"
+ if operation_summary.get("merged")
+ else "Entity updated successfully"
+ )
+ return {
+ "status": "success",
+ "message": response_message,
+ "data": entity_data,
+ "operation_summary": operation_summary,
+ }
+ except ValueError as ve:
+ logger.error(
+ f"Validation error updating entity '{request.entity_name}': {str(ve)}"
+ )
+ raise HTTPException(status_code=400, detail=str(ve))
+ except Exception as e:
+ logger.error(f"Error updating entity '{request.entity_name}': {str(e)}")
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error updating entity: {str(e)}"
+ )
+
+ @router.post("/graph/relation/edit", dependencies=[Depends(combined_auth)])
+ async def update_relation(request: RelationUpdateRequest):
+ """Update a relation's properties in the knowledge graph
+
+ Args:
+ request (RelationUpdateRequest): Request containing source ID, target ID and updated data
+
+ Returns:
+ Dict: Updated relation information
+ """
+ try:
+ result = await rag.aedit_relation(
+ source_entity=request.source_id,
+ target_entity=request.target_id,
+ updated_data=request.updated_data,
+ )
+ return {
+ "status": "success",
+ "message": "Relation updated successfully",
+ "data": result,
+ }
+ except ValueError as ve:
+ logger.error(
+ f"Validation error updating relation between '{request.source_id}' and '{request.target_id}': {str(ve)}"
+ )
+ raise HTTPException(status_code=400, detail=str(ve))
+ except Exception as e:
+ logger.error(
+ f"Error updating relation between '{request.source_id}' and '{request.target_id}': {str(e)}"
+ )
+ logger.error(traceback.format_exc())
+ raise HTTPException(
+ status_code=500, detail=f"Error updating relation: {str(e)}"
+ )
+
+ @router.post("/graph/entity/create", dependencies=[Depends(combined_auth)])
+ async def create_entity(request: EntityCreateRequest):
+ """
+ Create a new entity in the knowledge graph
+
+ This endpoint creates a new entity node in the knowledge graph with the specified
+ properties. The system automatically generates vector embeddings for the entity
+ to enable semantic search and retrieval.
+
+ Request Body:
+ entity_name (str): Unique name identifier for the entity
+ entity_data (dict): Entity properties including:
+ - description (str): Textual description of the entity
+ - entity_type (str): Category/type of the entity (e.g., PERSON, ORGANIZATION, LOCATION)
+ - source_id (str): Related chunk_id from which the description originates
+ - Additional custom properties as needed
+
+ Response Schema:
+ {
+ "status": "success",
+ "message": "Entity 'Tesla' created successfully",
+ "data": {
+ "entity_name": "Tesla",
+ "description": "Electric vehicle manufacturer",
+ "entity_type": "ORGANIZATION",
+ "source_id": "chunk-123this.formattedUUID(s,o))),__publicField(this,"formattedUUID",((s,o)=>{const i={$r:this.randomUUID,$s:this.sequentialUUID,$t:this.stamp};return s.replace(/\$[rs]\d{0,}|\$t0|\$t[1-9]\d{1,}/g,(s=>{const a=s.slice(0,2),u=Number.parseInt(s.slice(2),10);return"$s"===a?i[a]().padStart(u,"0"):"$t"===a&&o?i[a](u,o):i[a](u)}))})),__publicField(this,"availableUUIDs",((s=this.uuidLength)=>Number.parseFloat(([...new Set(this.dict)].length**s).toFixed(0)))),__publicField(this,"_collisionCache",new Map),__publicField(this,"approxMaxBeforeCollision",((s=this.availableUUIDs(this.uuidLength))=>{const o=s,i=this._collisionCache.get(o);if(void 0!==i)return i;const a=Number.parseFloat(Math.sqrt(Math.PI/2*s).toFixed(20));return this._collisionCache.set(o,a),a})),__publicField(this,"collisionProbability",((s=this.availableUUIDs(this.uuidLength),o=this.uuidLength)=>Number.parseFloat((this.approxMaxBeforeCollision(s)/this.availableUUIDs(o)).toFixed(20)))),__publicField(this,"uniqueness",((s=this.availableUUIDs(this.uuidLength))=>{const o=Number.parseFloat((1-this.approxMaxBeforeCollision(s)/s).toFixed(20));return o>1?1:o<0?0:o})),__publicField(this,"getVersion",(()=>this.version)),__publicField(this,"stamp",((s,o)=>{const i=Math.floor(+(o||new Date)/1e3).toString(16);if("number"==typeof s&&0===s)return i;if("number"!=typeof s||s<10)throw new Error(["Param finalLength must be a number greater than or equal to 10,","or 0 if you want the raw hexadecimal timestamp"].join("\n"));const a=s-9,u=Math.round(Math.random()*(a>15?15:a)),_=this.randomUUID(a);return"".concat(_.substring(0,u)).concat(i).concat(_.substring(u)).concat(u.toString(16))})),__publicField(this,"parseStamp",((s,o)=>{if(o&&!/t0|t[1-9]\d{1,}/.test(o))throw new Error("Cannot extract date from a formated UUID with no timestamp in the format");const i=o?o.replace(/\$[rs]\d{0,}|\$t0|\$t[1-9]\d{1,}/g,(s=>{const o={$r:s=>[...Array(s)].map((()=>"r")).join(""),$s:s=>[...Array(s)].map((()=>"s")).join(""),$t:s=>[...Array(s)].map((()=>"t")).join("")},i=s.slice(0,2),a=Number.parseInt(s.slice(2),10);return o[i](a)})).replace(/^(.*?)(t{8,})(.*)$/g,((o,i,a)=>s.substring(i.length,i.length+a.length))):s;if(8===i.length)return new Date(1e3*Number.parseInt(i,16));if(i.length<10)throw new Error("Stamp length invalid");const a=Number.parseInt(i.substring(i.length-1),16);return new Date(1e3*Number.parseInt(i.substring(a,a+8),16))})),__publicField(this,"setCounter",(s=>{this.counter=s})),__publicField(this,"validate",((s,o)=>{const i=o?this._normalizeDictionary(o):this.dict;return s.split("").every((s=>i.includes(s)))}));const o=__spreadValues(__spreadValues({},C),s);this.counter=0,this.debug=!1,this.dict=[],this.version="5.3.2";const{dictionary:i,shuffle:a,length:u,counter:_}=o;this.uuidLength=u,this.setDictionary(i,a),this.setCounter(_),this.debug=o.debug,this.log(this.dict),this.log("Generator instantiated with Dictionary Size ".concat(this.dictLength," and counter set to ").concat(this.counter)),this.log=this.log.bind(this),this.setDictionary=this.setDictionary.bind(this),this.setCounter=this.setCounter.bind(this),this.seq=this.seq.bind(this),this.sequentialUUID=this.sequentialUUID.bind(this),this.rnd=this.rnd.bind(this),this.randomUUID=this.randomUUID.bind(this),this.fmt=this.fmt.bind(this),this.formattedUUID=this.formattedUUID.bind(this),this.availableUUIDs=this.availableUUIDs.bind(this),this.approxMaxBeforeCollision=this.approxMaxBeforeCollision.bind(this),this.collisionProbability=this.collisionProbability.bind(this),this.uniqueness=this.uniqueness.bind(this),this.getVersion=this.getVersion.bind(this),this.stamp=this.stamp.bind(this),this.parseStamp=this.parseStamp.bind(this)}};__publicField(j,"default",j);var L,B=j;return L=w,((a,_,w,x)=>{if(_&&"object"==typeof _||"function"==typeof _)for(let C of i(_))u.call(a,C)||C===w||s(a,C,{get:()=>_[C],enumerable:!(x=o(_,C))||x.enumerable});return a})(s({},"__esModule",{value:!0}),L)})();s.exports=o.default,"undefined"!=typeof window&&(o=o.default)},9325:(s,o,i)=>{var a=i(34840),u="object"==typeof self&&self&&self.Object===Object&&self,_=a||u||Function("return this")();s.exports=_},9404:function(s){s.exports=function(){"use strict";var s=Array.prototype.slice;function createClass(s,o){o&&(s.prototype=Object.create(o.prototype)),s.prototype.constructor=s}function Iterable(s){return isIterable(s)?s:Seq(s)}function KeyedIterable(s){return isKeyed(s)?s:KeyedSeq(s)}function IndexedIterable(s){return isIndexed(s)?s:IndexedSeq(s)}function SetIterable(s){return isIterable(s)&&!isAssociative(s)?s:SetSeq(s)}function isIterable(s){return!(!s||!s[o])}function isKeyed(s){return!(!s||!s[i])}function isIndexed(s){return!(!s||!s[a])}function isAssociative(s){return isKeyed(s)||isIndexed(s)}function isOrdered(s){return!(!s||!s[u])}createClass(KeyedIterable,Iterable),createClass(IndexedIterable,Iterable),createClass(SetIterable,Iterable),Iterable.isIterable=isIterable,Iterable.isKeyed=isKeyed,Iterable.isIndexed=isIndexed,Iterable.isAssociative=isAssociative,Iterable.isOrdered=isOrdered,Iterable.Keyed=KeyedIterable,Iterable.Indexed=IndexedIterable,Iterable.Set=SetIterable;var o="@@__IMMUTABLE_ITERABLE__@@",i="@@__IMMUTABLE_KEYED__@@",a="@@__IMMUTABLE_INDEXED__@@",u="@@__IMMUTABLE_ORDERED__@@",_="delete",w=5,x=1<", "\\n\\n"]\')',
+ "temperature": "Controls randomness (0.0-2.0, higher = more creative)",
+ "top_p": "Nucleus sampling parameter (0.0-1.0, lower = more focused)",
+ "max_tokens": "Maximum number of tokens to generate (deprecated, use max_completion_tokens instead)",
+ "extra_body": 'Extra body parameters for OpenRouter of vLLM (JSON dict, e.g., \'"reasoning": {"reasoning": {"enabled": false}}\')',
+ }
+
+
+# =============================================================================
+# Main Section - For Testing and Sample Generation
+# =============================================================================
+#
+# When run as a script, this module:
+# 1. Generates and prints a sample .env file with all binding options
+# 2. If "test" argument is provided, demonstrates argument parsing with Ollama binding
+#
+# Usage:
+# python -m lightrag.llm.binding_options # Generate .env sample
+# python -m lightrag.llm.binding_options test # Test argument parsing
+#
+# =============================================================================
+
+if __name__ == "__main__":
+ import sys
+ import dotenv
+ # from io import StringIO
+
+ dotenv.load_dotenv(dotenv_path=".env", override=False)
+
+ # env_strstream = StringIO(
+ # ("OLLAMA_LLM_TEMPERATURE=0.1\nOLLAMA_EMBEDDING_TEMPERATURE=0.2\n")
+ # )
+ # # Load environment variables from .env file
+ # dotenv.load_dotenv(stream=env_strstream)
+
+ if len(sys.argv) > 1 and sys.argv[1] == "test":
+ # Add arguments for OllamaEmbeddingOptions, OllamaLLMOptions, and OpenAILLMOptions
+ parser = ArgumentParser(description="Test binding options")
+ OllamaEmbeddingOptions.add_args(parser)
+ OllamaLLMOptions.add_args(parser)
+ OpenAILLMOptions.add_args(parser)
+
+ # Parse arguments test
+ args = parser.parse_args(
+ [
+ "--ollama-embedding-num_ctx",
+ "1024",
+ "--ollama-llm-num_ctx",
+ "2048",
+ "--openai-llm-temperature",
+ "0.7",
+ "--openai-llm-max_completion_tokens",
+ "1000",
+ "--openai-llm-stop",
+ '["", "\\n\\n"]',
+ "--openai-llm-reasoning",
+ '{"effort": "high", "max_tokens": 2000, "exclude": false, "enabled": true}',
+ ]
+ )
+ print("Final args for LLM and Embedding:")
+ print(f"{args}\n")
+
+ print("Ollama LLM options:")
+ print(OllamaLLMOptions.options_dict(args))
+
+ print("\nOllama Embedding options:")
+ print(OllamaEmbeddingOptions.options_dict(args))
+
+ print("\nOpenAI LLM options:")
+ print(OpenAILLMOptions.options_dict(args))
+
+ # Test creating OpenAI options instance
+ openai_options = OpenAILLMOptions(
+ temperature=0.8,
+ max_completion_tokens=1500,
+ frequency_penalty=0.1,
+ presence_penalty=0.2,
+ stop=["<|end|>", "\n\n"],
+ )
+ print("\nOpenAI LLM options instance:")
+ print(openai_options.asdict())
+
+ # Test creating OpenAI options instance with reasoning parameter
+ openai_options_with_reasoning = OpenAILLMOptions(
+ temperature=0.9,
+ max_completion_tokens=2000,
+ reasoning={
+ "effort": "medium",
+ "max_tokens": 1500,
+ "exclude": True,
+ "enabled": True,
+ },
+ )
+ print("\nOpenAI LLM options instance with reasoning:")
+ print(openai_options_with_reasoning.asdict())
+
+ # Test dict parsing functionality
+ print("\n" + "=" * 50)
+ print("TESTING DICT PARSING FUNCTIONALITY")
+ print("=" * 50)
+
+ # Test valid JSON dict parsing
+ test_parser = ArgumentParser(description="Test dict parsing")
+ OpenAILLMOptions.add_args(test_parser)
+
+ try:
+ test_args = test_parser.parse_args(
+ ["--openai-llm-reasoning", '{"effort": "low", "max_tokens": 1000}']
+ )
+ print("✓ Valid JSON dict parsing successful:")
+ print(
+ f" Parsed reasoning: {OpenAILLMOptions.options_dict(test_args)['reasoning']}"
+ )
+ except Exception as e:
+ print(f"✗ Valid JSON dict parsing failed: {e}")
+
+ # Test invalid JSON dict parsing
+ try:
+ test_args = test_parser.parse_args(
+ [
+ "--openai-llm-reasoning",
+ '{"effort": "low", "max_tokens": 1000', # Missing closing brace
+ ]
+ )
+ print("✗ Invalid JSON should have failed but didn't")
+ except SystemExit:
+ print("✓ Invalid JSON dict parsing correctly rejected")
+ except Exception as e:
+ print(f"✓ Invalid JSON dict parsing correctly rejected: {e}")
+
+ # Test non-dict JSON parsing
+ try:
+ test_args = test_parser.parse_args(
+ [
+ "--openai-llm-reasoning",
+ '["not", "a", "dict"]', # Array instead of dict
+ ]
+ )
+ print("✗ Non-dict JSON should have failed but didn't")
+ except SystemExit:
+ print("✓ Non-dict JSON parsing correctly rejected")
+ except Exception as e:
+ print(f"✓ Non-dict JSON parsing correctly rejected: {e}")
+
+ print("\n" + "=" * 50)
+ print("TESTING ENVIRONMENT VARIABLE SUPPORT")
+ print("=" * 50)
+
+ # Test environment variable support for dict
+ import os
+
+ os.environ["OPENAI_LLM_REASONING"] = (
+ '{"effort": "high", "max_tokens": 3000, "exclude": false}'
+ )
+
+ env_parser = ArgumentParser(description="Test env var dict parsing")
+ OpenAILLMOptions.add_args(env_parser)
+
+ try:
+ env_args = env_parser.parse_args(
+ []
+ ) # No command line args, should use env var
+ reasoning_from_env = OpenAILLMOptions.options_dict(env_args).get(
+ "reasoning"
+ )
+ if reasoning_from_env:
+ print("✓ Environment variable dict parsing successful:")
+ print(f" Parsed reasoning from env: {reasoning_from_env}")
+ else:
+ print("✗ Environment variable dict parsing failed: No reasoning found")
+ except Exception as e:
+ print(f"✗ Environment variable dict parsing failed: {e}")
+ finally:
+ # Clean up environment variable
+ if "OPENAI_LLM_REASONING" in os.environ:
+ del os.environ["OPENAI_LLM_REASONING"]
+
+ else:
+ print(BindingOptions.generate_dot_env_sample())
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/deprecated/siliconcloud.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/deprecated/siliconcloud.py
new file mode 100644
index 0000000..fe8da0d
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/deprecated/siliconcloud.py
@@ -0,0 +1,69 @@
+import sys
+
+if sys.version_info < (3, 9):
+ pass
+else:
+ pass
+import pipmaster as pm # Pipmaster for dynamic library install
+
+# install specific modules
+if not pm.is_installed("lmdeploy"):
+ pm.install("lmdeploy")
+
+from openai import (
+ APIConnectionError,
+ RateLimitError,
+ APITimeoutError,
+)
+from tenacity import (
+ retry,
+ stop_after_attempt,
+ wait_exponential,
+ retry_if_exception_type,
+)
+
+
+import numpy as np
+import aiohttp
+import base64
+import struct
+
+
+@retry(
+ stop=stop_after_attempt(3),
+ wait=wait_exponential(multiplier=1, min=4, max=60),
+ retry=retry_if_exception_type(
+ (RateLimitError, APIConnectionError, APITimeoutError)
+ ),
+)
+async def siliconcloud_embedding(
+ texts: list[str],
+ model: str = "netease-youdao/bce-embedding-base_v1",
+ base_url: str = "https://api.siliconflow.cn/v1/embeddings",
+ max_token_size: int = 8192,
+ api_key: str = None,
+) -> np.ndarray:
+ if api_key and not api_key.startswith("Bearer "):
+ api_key = "Bearer " + api_key
+
+ headers = {"Authorization": api_key, "Content-Type": "application/json"}
+
+ truncate_texts = [text[0:max_token_size] for text in texts]
+
+ payload = {"model": model, "input": truncate_texts, "encoding_format": "base64"}
+
+ base64_strings = []
+ async with aiohttp.ClientSession() as session:
+ async with session.post(base_url, headers=headers, json=payload) as response:
+ content = await response.json()
+ if "code" in content:
+ raise ValueError(content)
+ base64_strings = [item["embedding"] for item in content["data"]]
+
+ embeddings = []
+ for string in base64_strings:
+ decode_bytes = base64.b64decode(string)
+ n = len(decode_bytes) // 4
+ float_array = struct.unpack("<" + "f" * n, decode_bytes)
+ embeddings.append(float_array)
+ return np.array(embeddings)
diff --git a/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/gemini.py b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/gemini.py
new file mode 100644
index 0000000..19f3586
--- /dev/null
+++ b/.tmp/lightrag_inspect/lightrag_pkg/lightrag/llm/gemini.py
@@ -0,0 +1,623 @@
+"""
+Gemini LLM binding for LightRAG.
+
+This module provides asynchronous helpers that adapt Google's Gemini models
+to the same interface used by the rest of the LightRAG LLM bindings. The
+implementation mirrors the OpenAI helpers while relying on the official
+``google-genai`` client under the hood.
+"""
+
+from __future__ import annotations
+
+import os
+from collections.abc import AsyncIterator
+from functools import lru_cache
+from typing import Any
+
+import numpy as np
+from tenacity import (
+ retry,
+ stop_after_attempt,
+ wait_exponential,
+ retry_if_exception_type,
+)
+
+from lightrag.utils import (
+ logger,
+ remove_think_tags,
+ safe_unicode_decode,
+ wrap_embedding_func_with_attrs,
+)
+
+import pipmaster as pm
+
+# Install the Google Gemini client and its dependencies on demand
+if not pm.is_installed("google-genai"):
+ pm.install("google-genai")
+if not pm.is_installed("google-api-core"):
+ pm.install("google-api-core")
+
+from google import genai # type: ignore
+from google.genai import types # type: ignore
+from google.api_core import exceptions as google_api_exceptions # type: ignore
+
+
+class InvalidResponseError(Exception):
+ """Custom exception class for triggering retry mechanism when Gemini returns empty responses"""
+
+ pass
+
+
+@lru_cache(maxsize=8)
+def _get_gemini_client(
+ api_key: str, base_url: str | None, timeout: int | None = None
+) -> genai.Client:
+ """
+ Create (or fetch cached) Gemini client.
+
+ Args:
+ api_key: Google Gemini API key (not used in Vertex AI mode).
+ base_url: Optional custom API endpoint.
+ timeout: Optional request timeout in milliseconds.
+
+ Returns:
+ genai.Client: Configured Gemini client instance.
+ """
+ client_kwargs: dict[str, Any] = {}
+
+ # Add Vertex AI support
+ use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
+ if use_vertexai:
+ # Vertex AI mode: use project/location, NOT api_key
+ client_kwargs["vertexai"] = True
+ project = os.getenv("GOOGLE_CLOUD_PROJECT")
+ if project:
+ location = os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1")
+ client_kwargs["project"] = project
+ if location:
+ client_kwargs["location"] = location
+ else:
+ raise ValueError(
+ "GOOGLE_CLOUD_PROJECT must be set when using Vertex AI mode"
+ )
+ else:
+ # Standard Gemini API mode: use api_key
+ client_kwargs["api_key"] = api_key
+
+ if base_url and base_url != "DEFAULT_GEMINI_ENDPOINT" or timeout is not None:
+ try:
+ http_options_kwargs = {}
+ if base_url and base_url != "DEFAULT_GEMINI_ENDPOINT":
+ http_options_kwargs["base_url"] = base_url
+ if timeout is not None:
+ http_options_kwargs["timeout"] = timeout
+
+ client_kwargs["http_options"] = types.HttpOptions(**http_options_kwargs)
+ except Exception as e:
+ logger.error("Failed to apply custom Gemini http_options: %s", e)
+ raise e
+
+ return genai.Client(**client_kwargs)
+
+
+def _ensure_api_key(api_key: str | None) -> str:
+ # In Vertex AI mode, API key is not required
+ use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
+ if use_vertexai:
+ # Return empty string for Vertex AI mode (not used)
+ return ""
+
+ key = api_key or os.getenv("LLM_BINDING_API_KEY") or os.getenv("GEMINI_API_KEY")
+ if not key:
+ raise ValueError(
+ "Gemini API key not provided. "
+ "Set LLM_BINDING_API_KEY or GEMINI_API_KEY in the environment."
+ )
+ return key
+
+
+def _build_generation_config(
+ base_config: dict[str, Any] | None,
+ system_prompt: str | None,
+ keyword_extraction: bool,
+) -> types.GenerateContentConfig | None:
+ config_data = dict(base_config or {})
+
+ if system_prompt:
+ if config_data.get("system_instruction"):
+ config_data["system_instruction"] = (
+ f"{config_data['system_instruction']}\n{system_prompt}"
+ )
+ else:
+ config_data["system_instruction"] = system_prompt
+
+ if keyword_extraction and not config_data.get("response_mime_type"):
+ config_data["response_mime_type"] = "application/json"
+
+ # Remove entries that are explicitly set to None to avoid type errors
+ sanitized = {
+ key: value
+ for key, value in config_data.items()
+ if value is not None and value != ""
+ }
+
+ if not sanitized:
+ return None
+
+ return types.GenerateContentConfig(**sanitized)
+
+
+def _format_history_messages(history_messages: list[dict[str, Any]] | None) -> str:
+ if not history_messages:
+ return ""
+
+ history_lines: list[str] = []
+ for message in history_messages:
+ role = message.get("role", "user")
+ content = message.get("content", "")
+ history_lines.append(f"[{role}] {content}")
+
+ return "\n".join(history_lines)
+
+
+def _extract_response_text(
+ response: Any, extract_thoughts: bool = False
+) -> tuple[str, str]:
+ """
+ Extract text content from Gemini response, separating regular content from thoughts.
+
+ Args:
+ response: Gemini API response object
+ extract_thoughts: Whether to extract thought content separately
+
+ Returns:
+ Tuple of (regular_text, thought_text)
+ """
+ candidates = getattr(response, "candidates", None)
+ if not candidates:
+ return ("", "")
+
+ regular_parts: list[str] = []
+ thought_parts: list[str] = []
+
+ for candidate in candidates:
+ if not getattr(candidate, "content", None):
+ continue
+ # Use 'or []' to handle None values from parts attribute
+ for part in getattr(candidate.content, "parts", None) or []:
+ text = getattr(part, "text", None)
+ if not text:
+ continue
+
+ # Check if this part is thought content using the 'thought' attribute
+ is_thought = getattr(part, "thought", False)
+
+ if is_thought and extract_thoughts:
+ thought_parts.append(text)
+ elif not is_thought:
+ regular_parts.append(text)
+
+ return ("\n".join(regular_parts), "\n".join(thought_parts))
+
+
+@retry(
+ stop=stop_after_attempt(3),
+ wait=wait_exponential(multiplier=1, min=4, max=60),
+ retry=(
+ retry_if_exception_type(google_api_exceptions.InternalServerError)
+ | retry_if_exception_type(google_api_exceptions.ServiceUnavailable)
+ | retry_if_exception_type(google_api_exceptions.ResourceExhausted)
+ | retry_if_exception_type(google_api_exceptions.GatewayTimeout)
+ | retry_if_exception_type(google_api_exceptions.BadGateway)
+ | retry_if_exception_type(google_api_exceptions.DeadlineExceeded)
+ | retry_if_exception_type(google_api_exceptions.Aborted)
+ | retry_if_exception_type(google_api_exceptions.Unknown)
+ | retry_if_exception_type(InvalidResponseError)
+ ),
+)
+async def gemini_complete_if_cache(
+ model: str,
+ prompt: str,
+ system_prompt: str | None = None,
+ history_messages: list[dict[str, Any]] | None = None,
+ enable_cot: bool = False,
+ base_url: str | None = None,
+ api_key: str | None = None,
+ token_tracker: Any | None = None,
+ stream: bool | None = None,
+ keyword_extraction: bool = False,
+ generation_config: dict[str, Any] | None = None,
+ timeout: int | None = None,
+ **_: Any,
+) -> str | AsyncIterator[str]:
+ """
+ Complete a prompt using Gemini's API with Chain of Thought (COT) support.
+
+ This function supports automatic integration of reasoning content from Gemini models
+ that provide Chain of Thought capabilities via the thinking_config API feature.
+
+ COT Integration:
+ - When enable_cot=True: Thought content is wrapped in a?iteratorDone():iteratorValue(s,u,i[o?a-u++:u++])}))},createClass(ObjectSeq,KeyedSeq),ObjectSeq.prototype.get=function(s,o){return void 0===o||this.has(s)?this._object[s]:o},ObjectSeq.prototype.has=function(s){return this._object.hasOwnProperty(s)},ObjectSeq.prototype.__iterate=function(s,o){for(var i=this._object,a=this._keys,u=a.length-1,_=0;_<=u;_++){var w=a[o?u-_:_];if(!1===s(i[w],w,this))return _+1}return _},ObjectSeq.prototype.__iterator=function(s,o){var i=this._object,a=this._keys,u=a.length-1,_=0;return new Iterator((function(){var w=a[o?u-_:_];return _++>u?iteratorDone():iteratorValue(s,w,i[w])}))},ObjectSeq.prototype[u]=!0,createClass(IterableSeq,IndexedSeq),IterableSeq.prototype.__iterateUncached=function(s,o){if(o)return this.cacheResult().__iterate(s,o);var i=getIterator(this._iterable),a=0;if(isIterator(i))for(var u;!(u=i.next()).done&&!1!==s(u.value,a++,this););return a},IterableSeq.prototype.__iteratorUncached=function(s,o){if(o)return this.cacheResult().__iterator(s,o);var i=getIterator(this._iterable);if(!isIterator(i))return new Iterator(iteratorDone);var a=0;return new Iterator((function(){var o=i.next();return o.done?o:iteratorValue(s,a++,o.value)}))},createClass(IteratorSeq,IndexedSeq),IteratorSeq.prototype.__iterateUncached=function(s,o){if(o)return this.cacheResult().__iterate(s,o);for(var i,a=this._iterator,u=this._iteratorCache,_=0;_", "\\n\\n"]\')',
+ }
+
+
+@dataclass
+class OllamaEmbeddingOptions(_OllamaOptionsMixin, BindingOptions):
+ """Options for Ollama embeddings with specialized configuration for embedding tasks."""
+
+ # mandatory name of binding
+ _binding_name: ClassVar[str] = "ollama_embedding"
+
+
+@dataclass
+class OllamaLLMOptions(_OllamaOptionsMixin, BindingOptions):
+ """Options for Ollama LLM with specialized configuration for LLM tasks."""
+
+ # mandatory name of binding
+ _binding_name: ClassVar[str] = "ollama_llm"
+
+
+# =============================================================================
+# Binding Options for Gemini
+# =============================================================================
+@dataclass
+class GeminiLLMOptions(BindingOptions):
+ """Options for Google Gemini models."""
+
+ _binding_name: ClassVar[str] = "gemini_llm"
+
+ temperature: float = DEFAULT_TEMPERATURE
+ top_p: float = 0.95
+ top_k: int = 40
+ max_output_tokens: int | None = None
+ candidate_count: int = 1
+ presence_penalty: float = 0.0
+ frequency_penalty: float = 0.0
+ stop_sequences: List[str] = field(default_factory=list)
+ seed: int | None = None
+ thinking_config: dict | None = None
+ safety_settings: dict | None = None
+
+ _help: ClassVar[dict[str, str]] = {
+ "temperature": "Controls randomness (0.0-2.0, higher = more creative)",
+ "top_p": "Nucleus sampling parameter (0.0-1.0)",
+ "top_k": "Limits sampling to the top K tokens (1 disables the limit)",
+ "max_output_tokens": "Maximum tokens generated in the response",
+ "candidate_count": "Number of candidates returned per request",
+ "presence_penalty": "Penalty for token presence (-2.0 to 2.0)",
+ "frequency_penalty": "Penalty for token frequency (-2.0 to 2.0)",
+ "stop_sequences": "Stop sequences (JSON array of strings, e.g., '[\"END\"]')",
+ "seed": "Random seed for reproducible generation (leave empty for random)",
+ "thinking_config": "Thinking configuration (JSON dict, e.g., '{\"thinking_budget\": 1024}' or '{\"include_thoughts\": true}')",
+ "safety_settings": "JSON object with Gemini safety settings overrides",
+ }
+
+
+@dataclass
+class GeminiEmbeddingOptions(BindingOptions):
+ """Options for Google Gemini embedding models."""
+
+ _binding_name: ClassVar[str] = "gemini_embedding"
+
+ task_type: str | None = None
+
+ _help: ClassVar[dict[str, str]] = {
+ "task_type": "Task type for embedding optimization. If not specified, automatically determined from context (RETRIEVAL_QUERY for queries, RETRIEVAL_DOCUMENT for documents). Supported types: RETRIEVAL_DOCUMENT, RETRIEVAL_QUERY, SEMANTIC_SIMILARITY, CLASSIFICATION, CLUSTERING, CODE_RETRIEVAL_QUERY, QUESTION_ANSWERING, FACT_VERIFICATION",
+ }
+
+
+# =============================================================================
+# Binding Options for OpenAI
+# =============================================================================
+#
+# OpenAI binding options provide configuration for OpenAI's API and Azure OpenAI.
+# These options control model behavior, sampling parameters, and generation settings.
+# The parameters are based on OpenAI's API specification and provide fine-grained
+# control over model inference and generation.
+#
+# =============================================================================
+@dataclass
+class OpenAILLMOptions(BindingOptions):
+ """Options for OpenAI LLM with configuration for OpenAI and Azure OpenAI API calls."""
+
+ # mandatory name of binding
+ _binding_name: ClassVar[str] = "openai_llm"
+
+ # Sampling and generation parameters
+ frequency_penalty: float = 0.0 # Penalty for token frequency (-2.0 to 2.0)
+ max_completion_tokens: int = None # Maximum number of tokens to generate
+ presence_penalty: float = 0.0 # Penalty for token presence (-2.0 to 2.0)
+ reasoning_effort: str = "medium" # Reasoning effort level (low, medium, high)
+ safety_identifier: str = "" # Safety identifier for content filtering
+ service_tier: str = "" # Service tier for API usage
+ stop: List[str] = field(default_factory=list) # Stop sequences
+ temperature: float = DEFAULT_TEMPERATURE # Controls randomness (0.0 to 2.0)
+ top_p: float = 1.0 # Nucleus sampling parameter (0.0 to 1.0)
+ max_tokens: int = None # Maximum number of tokens to generate(deprecated, use max_completion_tokens instead)
+ extra_body: dict = None # Extra body parameters for OpenRouter of vLLM
+
+ # Help descriptions
+ _help: ClassVar[dict[str, str]] = {
+ "frequency_penalty": "Penalty for token frequency (-2.0 to 2.0, positive values discourage repetition)",
+ "max_completion_tokens": "Maximum number of tokens to generate (optional, leave empty for model default)",
+ "presence_penalty": "Penalty for token presence (-2.0 to 2.0, positive values encourage new topics)",
+ "reasoning_effort": "Reasoning effort level for o1 models (low, medium, high)",
+ "safety_identifier": "Safety identifier for content filtering (optional)",
+ "service_tier": "Service tier for API usage (optional)",
+ "stop": 'Stop sequences (JSON array of strings, e.g., \'["
|
", "", name, flags=re.IGNORECASE) + name = re.sub(r"|
+
+
+
+
+ |
+
+
+ |
+
|
+
+
+ 📸
+
+ RAG-Anything+ Multimodal RAG + + |
+
+
+
+ 🎥
+
+ VideoRAG+ Extreme Long-Context Video RAG + + |
+
+
+
+ ✨
+
+ MiniRAG+ Extremely Simple RAG + + |
+
{{ message.text }}
-{{ block.text }}
-| {{ header }} | -
|---|
| {{ cell }} | -
系统正在读取当前记录的结构化信息。
-{{ error }}
- -{{ hermesRun.result_summary || '暂无运行摘要。' }}
-围绕当前 Hermes 任务查看关键字段。
-按工具调用顺序查看执行链。
-查看当前工具调用的请求与返回。
-{{ formatJson(selectedToolCall.request_json) }}
- {{ formatJson(selectedToolCall.response_json) }}
- 保留 Hermes 路由与进度原文,便于管理员核查。
-{{ formatJson(hermesRun.route_json) }}
- {{ systemEntry.event_type }} · {{ systemEntry.logger || '未标记模块' }}
-按单条系统日志提取出的结构化字段。
-系统对当前日志的归类与处理建议。
-保留该条记录的完整原文,便于排障核对。
-{{ systemEntry.raw }}
- 系统正在读取当前记录的结构化信息。
+{{ error }}
+ +{{ hermesRun.result_summary || '暂无运行摘要。' }}
+围绕当前 Hermes 任务查看关键字段。
+按工具调用顺序查看执行链。
+查看当前工具调用的请求与返回。
+{{ formatJson(selectedToolCall.request_json) }}
+ {{ formatJson(selectedToolCall.response_json) }}
+ 保留 Hermes 路由与进度原文,便于管理员核查。
+{{ formatJson(hermesRun.route_json) }}
+ {{ systemEntry.event_type }} · {{ systemEntry.logger || '未标记模块' }}
+按单条系统日志提取出的结构化字段。
+系统对当前日志的归类与处理建议。
+保留该条记录的完整原文,便于排障核对。
+{{ systemEntry.raw }}
+ 默认展示文件列表,点击具体文件后以弹窗方式展开预览。
-