feat(backend): 完善日志系统,支持按日期分目录存储

- 实现 logs/YYYY-MM-DD/ 日期文件夹结构
- 添加 success.log 和 failure.log 专用日志
- 使用 TimedRotatingFileHandler 实现按天切割
- 添加 log_success 和 log_failure 便捷函数
- 集成 markitdown 进行文件转换
- 优化文件存储路径,按项目ID分类存储

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Developer
2026-03-18 10:44:09 +08:00
parent 7514e7e763
commit 68453cead8
5 changed files with 276 additions and 112 deletions

View File

@@ -10,9 +10,11 @@ api_router = APIRouter()
# Include sub-routers
api_router.include_router(projects.router, prefix="/projects", tags=["projects"])
api_router.include_router(files.router, prefix="/files", tags=["files"])
api_router.include_router(chunks.router, prefix="/chunks", tags=["chunks"])
api_router.include_router(questions.router, prefix="/questions", tags=["questions"])
api_router.include_router(datasets.router, prefix="/datasets", tags=["datasets"])
api_router.include_router(eval.router, prefix="/eval", tags=["eval"])
# files, chunks, questions, datasets, eval 需要嵌套在 projects 下
# 通过 projects 路由中的子路由处理
api_router.include_router(files.router, prefix="/projects/{project_id}/files", tags=["files"])
api_router.include_router(chunks.router, prefix="/projects/{project_id}/chunks", tags=["chunks"])
api_router.include_router(questions.router, prefix="/projects/{project_id}/questions", tags=["questions"])
api_router.include_router(datasets.router, prefix="/projects/{project_id}/datasets", tags=["datasets"])
api_router.include_router(eval.router, prefix="/projects/{project_id}/eval", tags=["eval"])
api_router.include_router(models.router, prefix="/models", tags=["models"])

View File

@@ -2,6 +2,7 @@
Chunks API Router
"""
import asyncio
from pathlib import Path
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
@@ -13,19 +14,28 @@ from app.api.response import ApiResponse, PaginatedResponse
from app.core.database import get_db
from app.core.exceptions import NotFoundException
from app.core.crud import CRUDBase
from app.core.logging import log_success, log_failure
from app.models.models import Chunk, File
from app.schemas.chunk import ChunkResponse
from app.schemas.chunk import ChunkCreateSchema
from app.services.text_splitter.splitter import get_splitter
from app.services.file_processor.pdf_processor import process_pdf
from app.services.file_processor.docx_processor import process_docx
from app.services.file_processor.excel_processor import process_csv, process_excel
from markitdown import MarkItDown
router = APIRouter()
# Initialize CRUD
chunk_crud = CRUDBase(Chunk)
# Initialize markitdown
markitdown = MarkItDown()
def get_project_ready_dir(project_id: str) -> Path:
"""获取项目的 ready 文件目录"""
base_dir = Path("/data/code/YG-Datasets/data") / project_id / "ready"
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
class SplitRequest(BaseModel):
"""Request model for splitting text"""
@@ -37,28 +47,29 @@ class SplitRequest(BaseModel):
async def process_file_by_type(file: File) -> str:
"""Process file based on its type"""
"""Process file based on its type, convert to markdown"""
if not file.file_path:
raise NotFoundException("File", file.id)
processors = {
"pdf": process_pdf,
"docx": process_docx,
"xlsx": process_excel,
"csv": process_csv,
}
# Supported types for markitdown
markitdown_types = ["pdf", "docx", "doc", "pptx", "ppt", "xlsx", "xls", "htm", "html"]
processor = processors.get(file.file_type)
if not processor:
# Return raw text for txt, md files
if file.file_type in markitdown_types:
# Use markitdown to convert to markdown
loop = asyncio.get_event_loop()
content = await loop.run_in_executor(
result = await loop.run_in_executor(
None,
lambda: open(file.file_path, 'r', encoding='utf-8').read()
lambda: markitdown.convert(file.file_path)
)
return content
return result.text_content
return await processor(file.file_path)
# Return raw text for txt, md files
loop = asyncio.get_event_loop()
content = await loop.run_in_executor(
None,
lambda: open(file.file_path, 'r', encoding='utf-8').read()
)
return content
@router.post("/split", response_model=ApiResponse)
@@ -68,52 +79,97 @@ async def split_text(
db: AsyncSession = Depends(get_db)
):
"""Split text into chunks"""
# Get file
result = await db.execute(
select(File).where(File.id == request.file_id, File.project_id == project_id)
)
file = result.scalar_one_or_none()
if not file:
raise NotFoundException("File", request.file_id)
# Process file
text = await process_file_by_type(file)
# Update file status
file.status = "processing"
await db.commit()
# Split text
kwargs = {"chunk_size": request.chunk_size, "overlap": request.overlap}
if request.method == "custom" and request.separator:
kwargs["separator"] = request.separator
splitter = get_splitter(request.method, **kwargs)
split_results = splitter.split(text)
# Save chunks
chunks = []
for chunk_data in split_results:
db_chunk = Chunk(
project_id=project_id,
file_id=file.id,
name=chunk_data.get("name", f"Chunk {chunk_data['index'] + 1}"),
content=chunk_data["content"],
word_count=chunk_data.get("word_count", len(chunk_data["content"].split()))
try:
# Get file
result = await db.execute(
select(File).where(File.id == request.file_id, File.project_id == project_id)
)
db.add(db_chunk)
chunks.append(db_chunk)
file = result.scalar_one_or_none()
if not file:
raise NotFoundException("File", request.file_id)
await db.commit()
# 记录开始处理
log_success(
"开始处理文件",
project_id=str(project_id),
file_id=str(file.id),
filename=file.filename,
method=request.method,
chunk_size=request.chunk_size,
overlap=request.overlap
)
# Update file status
file.status = "completed"
await db.commit()
# Process file
text = await process_file_by_type(file)
return ApiResponse.ok(
data={"chunks": len(chunks)},
message=f"Successfully split into {len(chunks)} chunks"
)
# Update file status
file.status = "processing"
await db.commit()
# Split text
kwargs = {"chunk_size": request.chunk_size, "overlap": request.overlap}
if request.method == "custom" and request.separator:
kwargs["separator"] = request.separator
splitter = get_splitter(request.method, **kwargs)
split_results = splitter.split(text)
# Save chunks
chunks = []
for chunk_data in split_results:
db_chunk = Chunk(
project_id=project_id,
file_id=file.id,
name=chunk_data.get("name", f"Chunk {chunk_data['index'] + 1}"),
content=chunk_data["content"],
word_count=chunk_data.get("word_count", len(chunk_data["content"].split()))
)
db.add(db_chunk)
chunks.append(db_chunk)
await db.commit()
# Save processed markdown to ready directory
ready_dir = get_project_ready_dir(str(project_id))
md_filename = f"{file.id}_{file.filename}.md"
md_path = ready_dir / md_filename
# Write markdown content to file
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: md_path.write_text(text, encoding='utf-8')
)
# Update file path to ready location
file.file_path = str(md_path)
file.status = "completed"
await db.commit()
# 记录成功日志
log_success(
"文件处理完成",
project_id=str(project_id),
file_id=str(file.id),
filename=file.filename,
chunk_count=len(chunks),
text_length=len(text),
ready_path=str(md_path)
)
return ApiResponse.ok(
data={"chunks": len(chunks)},
message=f"Successfully split into {len(chunks)} chunks"
)
except Exception as e:
# 记录失败日志
log_failure(
"文件处理失败",
project_id=str(project_id),
file_id=str(request.file_id),
error=str(e)
)
raise
@router.get("", response_model=ApiResponse)

View File

@@ -5,7 +5,7 @@ import os
import asyncio
from pathlib import Path
from typing import Optional
from uuid import UUID
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, UploadFile, File, Query
from fastapi.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
@@ -15,20 +15,31 @@ from app.core.config import get_settings
from app.core.database import get_db
from app.core.exceptions import ValidationException, NotFoundException
from app.core.crud import CRUDBase
from app.core.logging import log_success, log_failure
from app.models.models import File as FileModel
from app.schemas.file import FileResponse, FileCreateSchema
settings = get_settings()
router = APIRouter()
# Ensure upload directory exists
UPLOAD_DIR = Path(settings.UPLOAD_DIR)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Initialize CRUD
file_crud = CRUDBase(FileModel)
def get_project_raw_dir(project_id: str) -> Path:
"""获取项目的 raw 文件目录"""
base_dir = Path("/data/code/YG-Datasets/data") / project_id / "raw"
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
def get_project_ready_dir(project_id: str) -> Path:
"""获取项目的 ready 文件目录(处理后的文件)"""
base_dir = Path("/data/code/YG-Datasets/data") / project_id / "ready"
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
def get_file_type(filename: str) -> str:
"""Get file type from extension"""
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
@@ -82,40 +93,62 @@ async def upload_file(
db: AsyncSession = Depends(get_db)
):
"""Upload a file"""
# Read file content for validation
content = await file.read()
file_size = len(content)
try:
# Read file content for validation
content = await file.read()
file_size = len(content)
# Validate file
validate_file(file.filename, file_size)
# Validate file
validate_file(file.filename, file_size)
# Save file to disk
safe_filename = f"{project_id}_{UUID.uuid4().hex[:8]}_{file.filename}"
file_path = UPLOAD_DIR / safe_filename
# Save file to disk - 使用项目 raw 目录
safe_filename = f"{uuid4().hex[:8]}_{file.filename}"
project_dir = get_project_raw_dir(str(project_id))
file_path = project_dir / safe_filename
# Write file asynchronously
await asyncio.get_event_loop().run_in_executor(
None,
lambda: file_path.write_bytes(content)
)
# Write file asynchronously
await asyncio.get_event_loop().run_in_executor(
None,
lambda: file_path.write_bytes(content)
)
# Create file record
db_file = FileModel(
project_id=project_id,
filename=file.filename,
file_type=get_file_type(file.filename),
file_path=str(file_path),
size=file_size,
status="pending"
)
db.add(db_file)
await db.commit()
await db.refresh(db_file)
# Create file record
db_file = FileModel(
project_id=project_id,
filename=file.filename,
file_type=get_file_type(file.filename),
file_path=str(file_path),
size=file_size,
status="pending"
)
db.add(db_file)
await db.commit()
await db.refresh(db_file)
return ApiResponse.ok(
data={"id": str(db_file.id), "filename": db_file.filename, "status": db_file.status},
message="File uploaded successfully"
)
# 记录成功日志
log_success(
"文件上传成功",
project_id=str(project_id),
file_id=str(db_file.id),
filename=file.filename,
file_type=db_file.file_type,
file_size=file_size,
file_path=str(file_path)
)
return ApiResponse.ok(
data={"id": str(db_file.id), "filename": db_file.filename, "status": db_file.status},
message="File uploaded successfully"
)
except Exception as e:
# 记录失败日志
log_failure(
"文件上传失败",
project_id=str(project_id),
filename=file.filename if 'file' in locals() else "unknown",
error=str(e)
)
raise
@router.get("", response_model=ApiResponse)

View File

@@ -4,8 +4,9 @@ Logging Configuration
"""
import logging
import sys
from datetime import datetime
from typing import Any
from logging.handlers import RotatingFileHandler
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path
from app.core.config import get_settings
@@ -15,6 +16,18 @@ settings = get_settings()
LOG_DIR = Path("./logs")
LOG_DIR.mkdir(exist_ok=True)
# 日期格式
LOG_DATE = datetime.now().strftime("%Y-%m-%d")
# 当天的日志目录
CURRENT_LOG_DIR = LOG_DIR / LOG_DATE
CURRENT_LOG_DIR.mkdir(exist_ok=True)
def get_log_path(filename: str) -> Path:
"""获取当天的日志文件路径"""
return CURRENT_LOG_DIR / filename
def setup_logging(name: str = "yg_dataset") -> logging.Logger:
"""Setup application logging"""
@@ -35,20 +48,21 @@ def setup_logging(name: str = "yg_dataset") -> logging.Logger:
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# File handler
file_handler = RotatingFileHandler(
LOG_DIR / f"{name}.log",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
# Main log file handler - app.log
main_file_handler = TimedRotatingFileHandler(
get_log_path("app.log"),
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8"
)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
main_file_handler.setLevel(logging.INFO)
main_file_formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
main_file_handler.setFormatter(main_file_formatter)
logger.addHandler(main_file_handler)
return logger
@@ -57,6 +71,65 @@ def setup_logging(name: str = "yg_dataset") -> logging.Logger:
logger = setup_logging()
# ============== Success Logger ==============
def get_success_logger() -> logging.Logger:
"""获取成功日志记录器"""
success_logger = logging.getLogger("yg_dataset.success")
if not success_logger.handlers:
handler = RotatingFileHandler(
get_log_path("success.log"),
maxBytes=10 * 1024 * 1024,
backupCount=30,
encoding="utf-8"
)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
success_logger.addHandler(handler)
success_logger.setLevel(logging.INFO)
return success_logger
# ============== Failure Logger ==============
def get_failure_logger() -> logging.Logger:
"""获取失败日志记录器"""
failure_logger = logging.getLogger("yg_dataset.failure")
if not failure_logger.handlers:
handler = RotatingFileHandler(
get_log_path("failure.log"),
maxBytes=10 * 1024 * 1024,
backupCount=30,
encoding="utf-8"
)
handler.setLevel(logging.WARNING)
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
failure_logger.addHandler(handler)
failure_logger.setLevel(logging.WARNING)
return failure_logger
# ============== Convenience functions ==============
def log_success(message: str, **kwargs):
"""记录成功日志"""
extra_info = " | ".join([f"{k}={v}" for k, v in kwargs.items()]) if kwargs else ""
full_message = f"{message} | {extra_info}" if extra_info else message
get_success_logger().info(full_message)
def log_failure(message: str, **kwargs):
"""记录失败日志"""
extra_info = " | ".join([f"{k}={v}" for k, v in kwargs.items()]) if kwargs else ""
full_message = f"{message} | {extra_info}" if extra_info else message
get_failure_logger().warning(full_message)
class LoggerMixin:
"""Mixin to add logging capability to classes"""

View File

@@ -107,7 +107,7 @@ async def app_exception_handler(request: Request, exc: AppException):
content=ApiResponse.fail(
message=exc.message,
error={"code": exc.code, "details": exc.details}
).model_dump()
).model_dump(mode='json')
)
@@ -127,7 +127,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
content=ApiResponse.fail(
message="Validation error",
error={"code": "VALIDATION_ERROR", "details": {"errors": errors}}
).model_dump()
).model_dump(mode='json')
)
@@ -140,7 +140,7 @@ async def database_exception_handler(request: Request, exc: SQLAlchemyError):
content=ApiResponse.fail(
message="Database operation failed",
error={"code": "DATABASE_ERROR"}
).model_dump()
).model_dump(mode='json')
)
@@ -153,7 +153,7 @@ async def general_exception_handler(request: Request, exc: Exception):
content=ApiResponse.fail(
message="Internal server error",
error={"code": "INTERNAL_ERROR"}
).model_dump()
).model_dump(mode='json')
)