修改了数据集上传,修改了模型配置页面

This commit is contained in:
2026-01-14 16:35:22 +08:00
parent e1ab76a9a1
commit 43b6018a3e
19 changed files with 1058 additions and 1231 deletions

39
request/_backend.bat Normal file
View File

@@ -0,0 +1,39 @@
@echo off
chcp 65001 >nul
setlocal enabledelayedexpansion
:: 读取配置
cd /d "%~dp0"
powershell -Command "try { $config = Get-Content ../config.json | ConvertFrom-Json; Write-Host $config.backend.host; } catch { Write-Host 'ERROR'; exit 1; }" > temp_host.txt
set /p backend_host=<temp_host.txt
powershell -Command "try { $config = Get-Content ../config.json | ConvertFrom-Json; Write-Host $config.backend.port; } catch { Write-Host 'ERROR'; exit 1; }" > temp_port.txt
set /p backend_port=<temp_port.txt
powershell -Command "try { $config = Get-Content ../config.json | ConvertFrom-Json; Write-Host $config.backend.python_path; } catch { Write-Host 'ERROR'; exit 1; }" > temp_python.txt
set /p python_path=<temp_python.txt
powershell -Command "try { $config = Get-Content ../config.json | ConvertFrom-Json; Write-Host $config.backend.main_module; } catch { Write-Host 'ERROR'; exit 1; }" > temp_module.txt
set /p main_module=<temp_module.txt
powershell -Command "try { $config = Get-Content ../config.json | ConvertFrom-Json; Write-Host $config.backend.log_level; } catch { Write-Host 'INFO'; exit 1; }" > temp_loglevel.txt
set /p log_level=<temp_loglevel.txt
del temp_host.txt temp_port.txt temp_python.txt temp_module.txt temp_loglevel.txt 2>nul
echo [后端] 启动配置:
echo [后端] 主机: !backend_host!
echo [后端] 端口: !backend_port!
echo [后端] 日志级别: !log_level!
echo [后端] 主模块: !main_module!
echo.
:: 设置环境变量
set PYTHONPATH=src
set LOGLEVEL=!log_level!
:: 启动后端服务
echo [后端] 正在启动服务...
!python_path! -m uvicorn !main_module! --host !backend_host! --port !backend_port!
pause

View File

@@ -1,84 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# 永远从脚本所在目录运行(避免在别的目录执行导致路径错误)
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
echo "🧹 X-Request 框架环境清理"
echo "=========================="
# 函数:加载 .env 文件中的变量
load_env_file() {
local env_file=".env"
if [ -f "$env_file" ]; then
while IFS='=' read -r key value; do
[[ "$key" =~ ^#.*$ ]] && continue
[[ -z "$key" ]] && continue
value=$(echo "$value" | sed 's/^["'\'']//' | sed 's/["'\'']$//')
export "$key=$value"
done < "$env_file"
fi
}
# 加载环境配置
load_env_file
# 检查虚拟环境是否存在
if [ ! -d "xrequest" ]; then
echo "⚠️ 虚拟环境不存在,无需清理"
exit 0
fi
echo "📋 检测到虚拟环境: xrequest"
# 询问用户确认
read -p "确定要删除虚拟环境吗?(y/N): " confirm
if [[ ! "$confirm" =~ ^[Yy]$ ]]; then
echo "❌ 操作已取消"
exit 0
fi
# 删除虚拟环境
echo "🗑️ 正在删除虚拟环境..."
rm -rf xrequest
if [ $? -eq 0 ]; then
echo "✅ 虚拟环境已删除"
else
echo "❌ 虚拟环境删除失败"
exit 1
fi
# 询问是否清理日志
if [ -d "${LOGS_DIR:-logs}" ]; then
echo ""
read -p "是否也要清理日志目录?(y/N): " clean_logs
if [[ "$clean_logs" =~ ^[Yy]$ ]]; then
echo "🗑️ 正在清理日志目录..."
rm -rf "${LOGS_DIR:-logs}"
if [ $? -eq 0 ]; then
echo "✅ 日志目录已清理"
else
echo "⚠️ 日志目录清理失败"
fi
fi
fi
# 询问是否清理 __pycache__ 和 .pyc 文件
echo ""
read -p "是否清理 Python 缓存文件 (__pycache__, *.pyc)(y/N): " clean_cache
if [[ "$clean_cache" =~ ^[Yy]$ ]]; then
echo "🗑️ 正在清理 Python 缓存..."
find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
find . -type f -name "*.pyc" -delete 2>/dev/null || true
echo "✅ Python 缓存已清理"
fi
echo ""
echo "🎉 清理完成!"
echo ""
echo "📝 如需重新设置环境,请运行:"
echo " ./setup.sh"
echo ""

View File

@@ -1 +1 @@
3807
2916

View File

@@ -111,168 +111,64 @@ class DatasetAPI(BaseAPI):
return StandardResponse.error(f"上传失败: {str(e)}")
@get("", response_model=StandardResponse)
async def list_datasets(self, list_all: bool = False):
async def list_datasets(self):
"""
获取所有数据集列表
Args:
list_all: 是否列出data目录下的所有文件物理文件默认False只列出API上传的文件
获取数据集列表只返回filename_mapping.json中记录的文件
Returns:
StandardResponse: 包含数据集列表的标准响应
"""
# 添加调试日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"list_datasets called with list_all={list_all}")
try:
if list_all:
# 列出data目录下的所有文件物理文件
import json
from pathlib import Path
import json
data_dir = file_upload_service.upload_dir
mapping_file = data_dir / "filename_mapping.json"
data_dir = file_upload_service.upload_dir
mapping_file = data_dir / "filename_mapping.json"
# 读取文件名映射
mappings = {}
if mapping_file.exists():
try:
with open(mapping_file, 'r', encoding='utf-8') as f:
mapping_data = json.load(f)
mappings = mapping_data.get("mappings", {})
except Exception:
mappings = {}
# 读取文件名映射
mappings = {}
if mapping_file.exists():
with open(mapping_file, 'r', encoding='utf-8') as f:
mapping_data = json.load(f)
mappings = mapping_data.get("mappings", {})
# 获取data目录下的所有JSON文件
datasets = []
if data_dir.exists():
for file_path in data_dir.iterdir():
# 跳过目录和映射文件本身
if file_path.is_file() and file_path.name != "filename_mapping.json":
file_id = file_path.stem # 去掉.json后缀得到file_id
# 根据映射文件构建数据集列表
datasets = []
for file_id, mapping_info in mappings.items():
# 获取物理文件信息
storage_filename = mapping_info.get("storage_filename", f"{file_id}.json")
file_path = data_dir / storage_filename
# 从映射文件获取真实文件名
mapping_info = mappings.get(file_id, {})
original_filename = mapping_info.get("original_filename", file_path.name)
uploaded_at = mapping_info.get("uploaded_at", "")
if file_path.exists():
# 获取文件大小
file_size = file_path.stat().st_size
size_mb = round(file_size / 1024 / 1024, 2)
size_display = format_file_size(file_size)
# 获取文件大小
file_size = file_path.stat().st_size
datasets.append({
"file_id": file_id,
"name": mapping_info.get("original_filename", ""),
"size": file_size,
"size_mb": size_mb,
"size_display": size_display,
"status": "已处理",
"description": mapping_info.get("original_filename", ""),
"uploaded_at": mapping_info.get("uploaded_at", ""),
"download_count": 0,
"is_physical_file": True
})
# 格式化文件大小
size_mb = round(file_size / 1024 / 1024, 2)
size_display = format_file_size(file_size)
# 按上传时间排序
datasets.sort(key=lambda x: x["uploaded_at"], reverse=True)
datasets.append({
"file_id": file_id,
"name": original_filename,
"size": file_size,
"size_mb": size_mb,
"size_display": size_display,
"status": "已处理",
"description": mapping_info.get("original_filename", "") if mapping_info else "",
"uploaded_at": uploaded_at,
"download_count": 0,
"is_physical_file": True
})
# 按文件名排序
datasets.sort(key=lambda x: x["name"])
return StandardResponse.success({
"datasets": datasets,
"total": len(datasets),
"source": "physical_files"
})
else:
# 获取所有文件API上传的文件
all_files = file_upload_service.get_all_files()
# 转换为前端期望的格式
datasets = []
for uploaded_file in all_files:
# 只返回JSON/JSONL文件数据集文件
file_ext = uploaded_file.original_filename.lower().split('.')[-1] if '.' in uploaded_file.original_filename else ''
if file_ext in ['json', 'jsonl']:
# 获取文件名映射(显示真实文件名)
mapping = file_upload_service.get_filename_mapping(uploaded_file.file_id)
display_name = mapping["original_filename"] if mapping else uploaded_file.original_filename
# 格式化文件大小
size_mb = round(uploaded_file.file_size / 1024 / 1024, 2)
size_display = format_file_size(uploaded_file.file_size)
datasets.append({
"file_id": uploaded_file.file_id,
"name": display_name,
"size": uploaded_file.file_size,
"size_mb": size_mb,
"size_display": size_display,
"status": "已处理",
"description": uploaded_file.description or "",
"uploaded_at": uploaded_file.uploaded_at,
"download_count": uploaded_file.download_count,
"is_physical_file": False
})
return StandardResponse.success({
"datasets": datasets,
"total": len(datasets),
"source": "api_uploaded"
})
return StandardResponse.success({
"datasets": datasets,
"total": len(datasets),
"source": "filename_mapping"
})
except Exception as e:
return StandardResponse.error(f"获取数据集列表失败: {str(e)}")
@get("/{file_id}", response_model=StandardResponse)
async def get_dataset(self, file_id: str):
"""
获取特定数据集的详细信息
Args:
file_id: 文件ID
Returns:
StandardResponse: 包含数据集详情的标准响应
"""
try:
file_info = file_upload_service.get_file(file_id)
if not file_info:
return StandardResponse.error(f"数据集 {file_id} 不存在")
# 转换为前端期望的格式
# 显示真实文件名(从映射文件中获取)
mapping = file_upload_service.get_filename_mapping(file_info.file_id)
display_name = mapping["original_filename"] if mapping else file_info.original_filename
# 格式化文件大小
size_mb = round(file_info.file_size / 1024 / 1024, 2)
size_display = format_file_size(file_info.file_size)
dataset_info = {
"file_id": file_info.file_id,
"name": display_name,
"size": file_info.file_size,
"size_mb": size_mb,
"size_display": size_display,
"status": "已处理",
"description": file_info.description or "",
"uploaded_at": file_info.uploaded_at,
"updated_at": file_info.updated_at,
"download_count": file_info.download_count,
"content_type": file_info.content_type,
"file_hash": file_info.file_hash
}
return StandardResponse.success(dataset_info)
except Exception as e:
return StandardResponse.error(f"获取数据集详情失败: {str(e)}")
@get("/{file_id}", response_model=StandardResponse)
async def get_dataset(self, file_id: str):
"""

View File

@@ -7,10 +7,8 @@ import os
import json
import uuid
import httpx
import asyncio
from typing import List, Dict, Any, Optional
from fastapi import HTTPException, Body, Response
from fastapi.responses import StreamingResponse
import logging
# 导入基类
@@ -27,81 +25,6 @@ MODELS_CONFIG_PATH = os.path.join(BASE_DIR, "..", "..", "models", "models.json")
MODELS_CONFIG_PATH = os.path.normpath(MODELS_CONFIG_PATH)
async def handle_streaming_response(response: httpx.Response, model: Dict[str, Any], attempt: int, max_attempts: int) -> Dict[str, Any]:
"""处理流式响应"""
try:
if response.status_code != 200:
error_msg = f"API调用失败 (状态码: {response.status_code})"
try:
error_detail = response.json()
error_msg += f": {error_detail}"
except:
error_msg += f": {response.text}"
return {
'success': False,
'error': error_msg,
'status_code': response.status_code
}
full_content = ""
chunk_count = 0
usage = {}
# 处理流式数据
async for line in response.aiter_lines():
if line.startswith('data: '):
data_str = line[6:] # 移除 'data: ' 前缀
if data_str.strip() == '[DONE]':
break
try:
chunk = json.loads(data_str)
chunk_count += 1
# 提取内容
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
content = delta['content']
full_content += content
# 提取使用统计
if 'usage' in chunk:
usage = chunk['usage']
except json.JSONDecodeError:
continue
logger.info(f"流式响应完成 (尝试 {attempt}/{max_attempts}) - 接收 {chunk_count} 个数据块")
return {
'success': True,
'model': model['name'],
'content': full_content,
'usage': usage,
'streaming': True,
'chunks_received': chunk_count,
'raw_response': {
'object': 'chat.completion',
'model': model.get('version', 'unknown'),
'choices': [{
'message': {'role': 'assistant', 'content': full_content},
'finish_reason': 'stop'
}],
'usage': usage
}
}
except Exception as e:
logger.error(f"流式响应处理错误: {str(e)}")
return {
'success': False,
'error': f'流式响应处理失败: {str(e)}',
'error_type': type(e).__name__
}
class ModelManager:
"""模型配置管理器"""
@@ -227,245 +150,124 @@ class ModelManager:
@staticmethod
async def call_model(model_id: str, prompt: str, system_prompt: Optional[str] = None) -> Dict[str, Any]:
"""真实调用模型API"""
"""真实调用模型API - 简化版本"""
model = ModelManager.get_model_by_id(model_id)
if not model:
raise HTTPException(status_code=404, detail="模型不存在")
return {'success': False, 'error': '模型不存在'}
# 检查API配置
api_url = model.get('apiUrl', '').strip()
api_key = model.get('apiKey', '').strip()
version = model.get('version', '').strip()
provider = model.get('provider', '').strip()
# 调试日志
logger.info(f"Model {model.get('name')} config", extra={
'provider': provider,
'api_url': api_url,
'version': version,
'has_api_key': bool(api_key),
'api_key_length': len(api_key) if api_key else 0
})
# 记录调试信息
model_name = str(model.get('name', 'unknown')) if model.get('name') is not None else 'unknown'
logger.info(f"测试模型: {model_name}")
logger.info(f"API地址: {api_url}")
logger.info(f"模型版本: {version}")
logger.info(f"API密钥长度: {len(api_key) if api_key else 0}")
if not api_url:
raise HTTPException(status_code=400, detail="模型API地址未配置")
return {'success': False, 'error': 'API地址未配置'}
if not version:
raise HTTPException(status_code=400, detail="模型版本未配置")
return {'success': False, 'error': '模型版本未配置'}
# 准备请求数据
request_data = {
"model": version,
"messages": [],
"temperature": model.get('temperature', 0.7),
"top_p": model.get('topP', 1.0),
"max_tokens": model.get('maxTokens', 2048),
"stream": model.get('streaming', False)
}
# 准备请求数据 - 标准的OpenAI格式
messages = []
# 添加系统提示词
if system_prompt:
request_data["messages"].append({
"role": "system",
"content": system_prompt
})
messages.append({"role": "system", "content": system_prompt})
elif model.get('systemPrompt'):
request_data["messages"].append({
"role": "system",
"content": model.get('systemPrompt')
})
system_prompt_content = model.get('systemPrompt')
if system_prompt_content:
messages.append({"role": "system", "content": system_prompt_content})
# 添加用户提示词
request_data["messages"].append({
"role": "user",
"content": prompt
})
messages.append({"role": "user", "content": prompt})
# 设置请求头
logger.info(f"准备发送的消息: {messages}")
request_data = {
"model": version,
"messages": messages,
"temperature": model.get('temperature', 0.7),
"top_p": model.get('topP', 1.0),
"max_tokens": model.get('maxTokens', 2048),
"stream": False # 强制使用非流式响应
}
# 设置请求头 - 根据是否有API密钥决定认证方式
headers = {
"Content-Type": "application/json"
}
# 添加API密钥(如果提供)
# 只有在有API密钥时才添加认证头
if api_key:
provider_lower = provider.lower()
headers["Authorization"] = f"Bearer {api_key}"
# 根据提供商类型设置认证头
if provider_lower == 'anthropic':
# Anthropic API
headers["x-api-key"] = api_key
headers["anthropic-version"] = "2023-06-01"
logger.info(f"设置Anthropic认证头x-api-key长度: {len(api_key)}")
else:
# OpenAI 和其他兼容的API包括自定义
headers["Authorization"] = f"Bearer {api_key}"
logger.info(f"设置Bearer认证头key长度: {len(api_key)}")
try:
# 发送API请求 - 只支持OpenAI兼容格式
timeout = httpx.Timeout(30)
async with httpx.AsyncClient(timeout=timeout) as client:
# 记录请求信息隐藏API密钥
masked_headers = {k: (v[:10] + '...' if k == 'Authorization' and len(v) > 10 else v) for k, v in headers.items()}
logger.info(f"发送请求到: {api_url.rstrip('/')}/chat/completions")
logger.info(f"请求头: {masked_headers}")
logger.info(f"请求体: {request_data}")
# 记录最终使用的请求头隐藏API key
logger.info(f"最终请求头", extra={
'headers': {k: v if k != 'Authorization' and k != 'x-api-key' else '***HIDDEN***' for k, v in headers.items()},
'api_key_masked': f"{api_key[:4]}...{api_key[-4:]}" if len(api_key) > 8 else '***'
})
else:
logger.warning(f"未提供API key for model {model.get('name')}")
# 记录最终使用的请求头
logger.info(f"最终请求头", extra={
'headers': headers,
'api_key': 'NONE'
})
response = await client.post(
f"{api_url.rstrip('/')}/chat/completions",
headers=headers,
json=request_data
)
# 获取重试配置
max_retries = model.get('maxRetries', 3)
retry_delay = 2 # 重试间隔2秒
# 检查响应
logger.info(f"响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.info(f"响应内容: {result}")
last_error = None
# 解析响应
if "choices" in result and len(result["choices"]) > 0:
message = result["choices"][0]["message"]
content = message.get("content", "") or ""
logger.info(f"原始响应内容: {message}")
logger.info(f"解析成功,收到回复长度: {len(content)}")
# 重试循环
for attempt in range(max_retries + 1):
try:
# 发送API请求
timeout = httpx.Timeout(model.get('timeout', 30))
provider = model.get('provider', '').lower()
async with httpx.AsyncClient(timeout=timeout) as client:
# 根据提供商类型调用不同的API端点
if provider_lower == 'anthropic':
# Anthropic API格式
anthropic_request = {
"model": version,
"max_tokens": request_data["max_tokens"],
"messages": request_data["messages"][1:] # 移除system messageAnthropic使用system参数
}
if system_prompt or model.get('systemPrompt'):
anthropic_request["system"] = system_prompt or model.get('systemPrompt')
logger.info(f"发送Anthropic API请求 (尝试 {attempt + 1}/{max_retries + 1})", extra={
'url': f"{api_url.rstrip('/')}/messages",
'headers': {k: v if k != 'x-api-key' else '***HIDDEN***' for k, v in headers.items()},
'request_body': anthropic_request
})
response = await client.post(
f"{api_url.rstrip('/')}/messages",
headers=headers,
json=anthropic_request
)
else:
# OpenAI 和其他兼容的API包括自定义、本地部署等
logger.info(f"发送OpenAI兼容API请求 (尝试 {attempt + 1}/{max_retries + 1})", extra={
'url': f"{api_url.rstrip('/')}/chat/completions",
'headers': {k: v if k != 'Authorization' else '***HIDDEN***' for k, v in headers.items()},
'request_body': request_data
})
response = await client.post(
f"{api_url.rstrip('/')}/chat/completions",
headers=headers,
json=request_data
)
# 处理流式响应
if request_data.get('stream', False):
return await handle_streaming_response(response, model, attempt + 1, max_retries + 1)
# 检查响应状态(非流式)
if response.status_code == 200:
result = response.json()
# 解析响应 - 根据提供商类型
provider = model.get('provider', '').lower()
if provider == 'anthropic' and "content" in result and len(result["content"]) > 0:
# Anthropic格式响应
content = result["content"][0]["text"]
logger.info(f"API调用成功 (尝试 {attempt + 1})")
return {
'success': True,
'model': model['name'],
'content': content,
'usage': result.get('usage', {}),
'raw_response': result
}
elif "choices" in result and len(result["choices"]) > 0:
# OpenAI和其他兼容API格式响应
content = result["choices"][0]["message"]["content"]
logger.info(f"API调用成功 (尝试 {attempt + 1})")
return {
'success': True,
'model': model['name'],
'content': content,
'usage': result.get('usage', {}),
'raw_response': result
}
# 如果内容为空,记录警告但仍然返回成功
if not content:
logger.warning("API返回的内容为空")
return {
'success': False,
'error': '无法解析API响应',
'success': True,
'model': str(model.get('name', 'unknown')),
'content': content,
'usage': result.get('usage', {}),
'raw_response': result
}
else:
error_msg = f"API调用失败 (状态码: {response.status_code})"
try:
error_detail = response.json()
error_msg += f": {error_detail}"
except:
error_msg += f": {response.text}"
last_error = {
'success': False,
'error': error_msg,
'status_code': response.status_code
}
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries:
logger.warning(f"API调用失败{retry_delay}秒后重试 (尝试 {attempt + 1}/{max_retries + 1}): {error_msg}")
await asyncio.sleep(retry_delay)
continue
else:
# 最后一次尝试失败,返回错误
return last_error
except httpx.TimeoutException as e:
last_error = {
'success': False,
'error': 'API调用超时',
'timeout': model.get('timeout', 30)
}
logger.warning(f"API调用超时 (尝试 {attempt + 1}/{max_retries + 1})")
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries:
await asyncio.sleep(retry_delay)
continue
logger.error(f"API响应格式异常: {result}")
return {'success': False, 'error': 'API响应格式异常'}
else:
return last_error
# 获取错误信息
try:
error_detail = response.json()
error_detail_str = str(error_detail) if error_detail is not None else '无错误详情'
logger.error(f"API错误响应: {error_detail}")
except:
error_detail_str = response.text or '未知错误'
logger.error(f"API错误响应文本: {error_detail_str}")
except httpx.RequestError as e:
last_error = {
'success': False,
'error': f'网络请求错误: {str(e)}'
}
logger.warning(f"网络请求错误 (尝试 {attempt + 1}/{max_retries + 1}): {str(e)}")
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries:
await asyncio.sleep(retry_delay)
continue
else:
return last_error
return {
'success': False,
'error': f'API调用失败 ({response.status_code}): {error_detail_str}'
}
except Exception as e:
last_error = {
'success': False,
'error': f'调用失败: {str(e)}',
'error_type': type(e).__name__
}
logger.error(f"未知错误 (尝试 {attempt + 1}/{max_retries + 1}): {str(e)}")
# 其他错误不重试,直接返回
return last_error
# 如果所有重试都失败,返回最后一次错误
return last_error
except Exception as e:
error_str = str(e) if e is not None else '未知异常'
return {'success': False, 'error': f'调用异常: {error_str}'}
@staticmethod
def test_model(model_id: str) -> Dict[str, Any]:
@@ -530,7 +332,8 @@ class ModelManager:
message = '连接测试成功'
else:
model['status'] = '连接失败'
message = f'连接测试失败: {test_result.get("error", "未知错误")}'
error = test_result.get("error")
message = f'连接测试失败: {str(error) if error else "未知错误"}'
# 保存更新
ModelManager.update_model(model_id, {'status': model['status']})
@@ -656,7 +459,8 @@ class ModelsAPI(BaseAPI):
message = '连接测试成功'
else:
model['status'] = '连接失败'
message = f'连接测试失败: {result.get("error", "未知错误")}'
error = result.get("error")
message = f'连接测试失败: {str(error) if error else "未知错误"}'
# 保存更新
ModelManager.update_model(model_id, {'status': model['status']})
@@ -678,157 +482,28 @@ class ModelsAPI(BaseAPI):
@post("/{model_id}/call")
async def call_model_api(self, model_id: str, request_data: Dict[str, Any] = Body(...)):
"""调用模型进行对话(支持流式输出"""
"""调用模型进行对话(使用非流式响应"""
try:
# 获取请求参数
prompt = request_data.get('prompt', '')
system_prompt = request_data.get('systemPrompt', None)
stream_enabled = request_data.get('stream', False)
if not prompt:
raise HTTPException(status_code=400, detail="提示词不能为空")
# 如果启用流式,返回流式响应
if stream_enabled:
return StreamingResponse(
self._stream_data(model_id, prompt, system_prompt),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Transfer-Encoding": "chunked"
}
)
else:
# 普通非流式调用
result = await ModelManager.call_model(
model_id=model_id,
prompt=prompt,
system_prompt=system_prompt
)
return self.success(result, "模型调用成功" if result.get('success') else "模型调用失败")
# 强制使用非流式调用
result = await ModelManager.call_model(
model_id=model_id,
prompt=prompt,
system_prompt=system_prompt
)
return self.success(result, "模型调用成功" if result.get('success') else "模型调用失败")
except HTTPException:
raise
except Exception as e:
self.logger.error(f"调用模型失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
async def _stream_data(self, model_id: str, prompt: str, system_prompt: Optional[str] = None):
"""真正的流式输出数据"""
try:
# 重新调用模型,获取流式响应
model = ModelManager.get_model_by_id(model_id)
if not model:
yield "data: " + json.dumps({"error": "模型不存在", "done": True}) + "\n\n"
return
# 设置流式请求
api_url = model.get('apiUrl', '').strip()
api_key = model.get('apiKey', '').strip()
version = model.get('version', '').strip()
provider = model.get('provider', '').strip()
# 准备请求数据
request_data = {
"model": version,
"messages": [],
"temperature": model.get('temperature', 0.7),
"top_p": model.get('topP', 1.0),
"max_tokens": model.get('maxTokens', 2048),
"stream": True # 启用流式
}
# 添加系统提示词
if system_prompt:
request_data["messages"].append({
"role": "system",
"content": system_prompt
})
elif model.get('systemPrompt'):
request_data["messages"].append({
"role": "system",
"content": model.get('systemPrompt')
})
# 添加用户提示词
request_data["messages"].append({
"role": "user",
"content": prompt
})
# 设置请求头
headers = {
"Content-Type": "application/json"
}
if api_key:
if provider.lower() == 'anthropic':
headers["x-api-key"] = api_key
headers["anthropic-version"] = "2023-06-01"
else:
headers["Authorization"] = f"Bearer {api_key}"
# 发送流式请求
async with httpx.AsyncClient(timeout=60) as client:
if provider.lower() == 'anthropic':
# Anthropic流式请求
anthropic_request = {
"model": version,
"max_tokens": request_data["max_tokens"],
"messages": request_data["messages"][1:],
"stream": True
}
if system_prompt or model.get('systemPrompt'):
anthropic_request["system"] = system_prompt or model.get('systemPrompt')
async with client.stream(
"POST",
f"{api_url.rstrip('/')}/messages",
headers=headers,
json=anthropic_request
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
yield "data: " + json.dumps({"content": "", "done": True}) + "\n\n"
break
try:
chunk_data = json.loads(data_str)
if "content" in chunk_data and len(chunk_data["content"]) > 0:
content = chunk_data["content"][0]["text"]
yield f"data: {json.dumps({'content': content, 'done': False})}\n\n"
except:
continue
else:
# OpenAI兼容流式请求
async with client.stream(
"POST",
f"{api_url.rstrip('/')}/chat/completions",
headers=headers,
json=request_data
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
yield "data: " + json.dumps({"content": "", "done": True}) + "\n\n"
break
try:
chunk_data = json.loads(data_str)
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
if "content" in delta and delta["content"]:
content = delta["content"]
yield f"data: {json.dumps({'content': content, 'done': False})}\n\n"
except:
continue
except Exception as e:
yield "data: " + json.dumps({"error": str(e), "done": True}) + "\n\n"
@post("/batch-delete")
async def batch_delete_models(self, model_ids: List[str] = Body(...)):
"""批量删除模型"""

View File

@@ -1,7 +1,7 @@
from pydantic_settings import BaseSettings
from typing import Optional
import os
from pathlib import Path
import json
class Settings(BaseSettings):
@@ -12,21 +12,21 @@ class Settings(BaseSettings):
# 服务器配置
host: str = "0.0.0.0"
port: int = 1112
port: int = 3000 # 默认使用config.json中的端口
workers: int = 1
# 日志配置
log_level: str = "INFO"
log_file: Optional[str] = "logs/app.log"
log_format: str = "json"
log_to_console: bool = False
log_file: Optional[str] = "logs/app.log" # 默认写入日志文件
log_format: str = "json" # json 或 console
log_to_console: bool = False # 是否同时输出到控制台
# 高级日志配置
advanced_logging: bool = True
logs_dir: str = "logs"
max_log_days: int = 30
enable_log_cleanup: bool = True
route_based_logging: bool = True
advanced_logging: bool = True # 是否启用高级日志系统
logs_dir: str = "logs" # 日志目录
max_log_days: int = 30 # 日志文件保存天数
enable_log_cleanup: bool = True # 是否启用自动日志清理
route_based_logging: bool = True # 是否启用基于路由的日志分类
# 性能配置
max_requests: int = 1000
@@ -39,10 +39,35 @@ class Settings(BaseSettings):
cors_headers: list[str] = ["*"]
class Config:
env_file = str(Path(__file__).parent.parent.parent / ".env")
env_file_encoding = "utf-8"
env_file = ".env"
case_sensitive = False
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 尝试读取根目录的config.json配置文件
try:
# 获取项目根目录(相对于当前文件的路径)
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.abspath(os.path.join(current_dir, "..", ".."))
config_path = os.path.join(root_dir, "config.json")
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 从config.json读取配置覆盖默认配置
if 'backend' in config:
backend_config = config['backend']
if 'port' in backend_config:
self.port = int(backend_config['port'])
if 'host' in backend_config:
self.host = backend_config['host']
if 'log_level' in backend_config:
self.log_level = backend_config['log_level']
except Exception as e:
# 如果读取配置文件失败,使用默认配置
print(f"警告: 无法读取配置文件 config.json使用默认配置. 错误: {e}")
# 全局设置实例
settings = Settings()

View File

@@ -224,7 +224,7 @@ def setup_routes(app: FastAPI):
@app.get("/routes")
async def get_routes_info():
"""获取所有路由信息"""
from ..api.discovery import get_registered_modules_info
from ..api.internal.discovery import get_registered_modules_info
response_data = {
"app_routes": [
@@ -246,6 +246,64 @@ def setup_routes(app: FastAPI):
registration_result = auto_register_routes(app)
# SSE模拟大模型输出端点在路由注册之后添加
@app.get("/sse")
async def sse_demo():
"""SSE模拟大模型流式输出演示"""
from fastapi.responses import StreamingResponse
import asyncio
import json
async def generate_ai_response():
"""模拟大模型的流式输出"""
# 模拟大模型回答关于人工智能的内容
response_parts = [
"人工智能AI是计算机科学的一个分支",
"它试图理解智能的实质,",
"并生产出一种新的能以人类智能相似的方式做出反应的智能机器。",
"包括机器人、语言识别、图像识别、自然语言处理和专家系统等。",
"",
"AI的发展可以追溯到1956年",
"当时在达特茅斯会议上,",
"约翰·麦卡锡首次提出了'人工智能'这一术语。",
"",
"经过几十年的发展,",
"AI已经取得了巨大的进步",
"特别是在深度学习、神经网络和大数据的推动下,",
"AI技术已经广泛应用于各个领域",
"包括医疗、金融、交通、教育、娱乐等。",
"",
"未来AI将继续发展",
"有望在更多领域发挥重要作用,",
"但同时也需要关注其带来的挑战,",
"如隐私保护、伦理问题、就业影响等。",
"",
"总的来说,",
"人工智能是人类智慧的结晶,",
"它将继续推动社会进步,",
"但我们也需要谨慎地对待它的发展和应用。"
]
for part in response_parts:
# 每部分作为一条消息发送
yield f"data: {json.dumps({'content': part, 'done': False})}\n\n"
# 模拟生成延迟
await asyncio.sleep(0.5)
# 发送完成信号
yield f"data: {json.dumps({'content': '', 'done': True})}\n\n"
return StreamingResponse(
generate_ai_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*"
}
)
# 挂载静态文件目录
from fastapi.responses import FileResponse
@@ -288,4 +346,7 @@ def get_app() -> FastAPI:
with _app_lock:
if _app_instance is None:
_app_instance = create_app()
return _app_instance
return _app_instance
# 导出应用实例
app = get_app()

View File

@@ -94,14 +94,88 @@ class FileUploadService:
# 文件信息存储(内存存储,可扩展为数据库)
self.files: Dict[str, UploadedFile] = {}
# 配置
self.max_file_size = 100 * 1024 * 1024 # 100MB 默认最大文件大小
self.allowed_extensions = None # None表示允许所有扩展名
# 初始化时加载已存在的文件信息
self._load_existing_files()
def _generate_file_id(self) -> str:
"""生成唯一文件ID"""
return str(uuid.uuid4())
def _load_existing_files(self):
"""
从文件名映射文件中加载已存在的文件信息
"""
try:
import json
from datetime import datetime
mapping_file = self.upload_dir / "filename_mapping.json"
if not mapping_file.exists():
print(f"[FileUploadService] 映射文件不存在,跳过加载")
return
print(f"[FileUploadService] 正在加载映射文件: {mapping_file}")
with open(mapping_file, 'r', encoding='utf-8') as f:
mapping_data = json.load(f)
mappings = mapping_data.get("mappings", {})
loaded_count = 0
for file_id, mapping_info in mappings.items():
try:
storage_filename = mapping_info.get("storage_filename", f"{file_id}.json")
file_path = self.upload_dir / storage_filename
# 检查物理文件是否存在
if not file_path.exists():
print(f"[FileUploadService] 警告: 文件不存在 {file_path}")
continue
# 获取文件大小
file_size = file_path.stat().st_size
# 创建UploadedFile对象
uploaded_file = UploadedFile(
file_id=file_id,
filename=storage_filename,
original_filename=mapping_info.get("original_filename", storage_filename),
file_path=str(file_path),
file_size=file_size,
content_type="application/json",
description=mapping_info.get("original_filename", storage_filename)
)
# 设置上传时间
uploaded_at = mapping_info.get("uploaded_at")
if uploaded_at:
uploaded_file.uploaded_at = uploaded_at
uploaded_file.updated_at = uploaded_at
# 计算文件哈希
try:
uploaded_file.file_hash = self._calculate_file_hash(file_path)
except Exception:
uploaded_file.file_hash = None
# 添加到内存字典
self.files[file_id] = uploaded_file
loaded_count += 1
print(f"[FileUploadService] 已加载文件: {uploaded_file.original_filename} (ID: {file_id})")
except Exception as e:
print(f"[FileUploadService] 加载文件 {file_id} 时出错: {e}")
continue
print(f"[FileUploadService] 加载完成,共加载 {loaded_count} 个文件")
except Exception as e:
print(f"[FileUploadService] 加载映射文件时出错: {e}")
import traceback
traceback.print_exc()
def _generate_storage_filename(self, original_filename: str, file_id: str) -> str:
"""生成存储文件名"""

10
request/start-backend.bat Normal file
View File

@@ -0,0 +1,10 @@
@echo off
echo Starting Backend API Server on Port 3000...
echo.
echo Backend API: http://localhost:3000/models/
echo Press Ctrl+C to stop the server
echo.
cd /d "%~dp0"
set PYTHONPATH=src
/d/Softwares/Anaconda/python -m uvicorn src.core.app:app --host 0.0.0.0 --port 3000
pause

View File

@@ -1 +1 @@
0bbf8a1a0807bf0cee9cf3cc5634f318
58852b1a007ac2ceb1f790a423c070f2

View File

@@ -854,20 +854,8 @@
"Datasets"
],
"summary": "list_datasets",
"description": "获取所有数据集列表\n\nArgs:\n list_all: 是否列出data目录下的所有文件物理文件默认False只列出API上传的文件)\n\nReturns:\n StandardResponse: 包含数据集列表的标准响应",
"description": "获取数据集列表只返回filename_mapping.json中记录的文件)\n\nReturns:\n StandardResponse: 包含数据集列表的标准响应",
"operationId": "list_datasets_datasets_get",
"parameters": [
{
"name": "list_all",
"in": "query",
"required": false,
"schema": {
"type": "boolean",
"default": false,
"title": "List All"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
@@ -878,16 +866,6 @@
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
@@ -984,7 +962,7 @@
"Models"
],
"summary": "call_model_api",
"description": "调用模型进行对话(支持流式输出",
"description": "调用模型进行对话(强制使用非流式响应",
"operationId": "call_model_api_models__model_id__call_post",
"parameters": [
{
@@ -1285,7 +1263,7 @@
"Models"
],
"summary": "test_model_connection",
"description": "测试模型连接",
"description": "测试模型连接 - 禁用流式响应",
"operationId": "test_model_connection_models__model_id__test_post",
"parameters": [
{
@@ -1319,6 +1297,23 @@
}
}
}
},
"/sse": {
"get": {
"summary": "Sse Demo",
"description": "SSE模拟大模型流式输出演示",
"operationId": "sse_demo_sse_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
}
}
}
}
}
}
},
"components": {

View File

@@ -0,0 +1,98 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE流式输出演示</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
max-width: 800px;
margin: 50px auto;
padding: 20px;
background: #f5f5f5;
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
#messages {
font-family: 'Courier New', Courier, monospace;
margin: 20px;
padding: 20px;
border: 2px solid #ddd;
border-radius: 8px;
min-height: 300px;
background: white;
line-height: 1.6;
font-size: 16px;
}
.message {
margin-bottom: 10px;
}
.done-indicator {
margin-top: 20px;
padding: 10px;
background: #d4edda;
color: #155724;
border-radius: 5px;
text-align: center;
font-weight: bold;
}
</style>
</head>
<body>
<h1>🚀 大模型流式输出演示</h1>
<div id="messages">正在连接服务器...</div>
<script>
const messageBox = document.getElementById('messages');
let currentContent = '';
// 使用相对路径,避免跨域问题
const source = new EventSource('/sse');
source.onopen = function() {
console.log('SSE连接已建立');
messageBox.innerHTML = '正在生成内容...\n\n';
};
source.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
if (data.error) {
messageBox.innerHTML += '<div style="color: red;">错误: ' + data.error + '</div>';
source.close();
return;
}
if (data.content !== undefined && data.content !== null) {
// 追加内容
currentContent += data.content;
messageBox.innerHTML = currentContent;
// 自动滚动到底部
messageBox.scrollTop = messageBox.scrollHeight;
}
if (data.done) {
console.log('流式输出完成');
messageBox.innerHTML += '\n\n<div class="done-indicator">✅ 内容生成完成!</div>';
source.close();
}
} catch (e) {
console.error('解析数据失败:', e);
console.log('原始数据:', event.data);
}
};
source.onerror = function(error) {
console.error('SSE连接出错:', error);
messageBox.innerHTML += '<div style="color: red; margin-top: 20px;">❌ 连接失败</div>';
source.close();
};
</script>
</body>
</html>