diff --git a/backend/app/api/v1/__init__.py b/backend/app/api/v1/__init__.py index c41ec91..ecb1107 100644 --- a/backend/app/api/v1/__init__.py +++ b/backend/app/api/v1/__init__.py @@ -10,9 +10,11 @@ api_router = APIRouter() # Include sub-routers api_router.include_router(projects.router, prefix="/projects", tags=["projects"]) -api_router.include_router(files.router, prefix="/files", tags=["files"]) -api_router.include_router(chunks.router, prefix="/chunks", tags=["chunks"]) -api_router.include_router(questions.router, prefix="/questions", tags=["questions"]) -api_router.include_router(datasets.router, prefix="/datasets", tags=["datasets"]) -api_router.include_router(eval.router, prefix="/eval", tags=["eval"]) +# files, chunks, questions, datasets, eval 需要嵌套在 projects 下 +# 通过 projects 路由中的子路由处理 +api_router.include_router(files.router, prefix="/projects/{project_id}/files", tags=["files"]) +api_router.include_router(chunks.router, prefix="/projects/{project_id}/chunks", tags=["chunks"]) +api_router.include_router(questions.router, prefix="/projects/{project_id}/questions", tags=["questions"]) +api_router.include_router(datasets.router, prefix="/projects/{project_id}/datasets", tags=["datasets"]) +api_router.include_router(eval.router, prefix="/projects/{project_id}/eval", tags=["eval"]) api_router.include_router(models.router, prefix="/models", tags=["models"]) diff --git a/backend/app/api/v1/chunks/__init__.py b/backend/app/api/v1/chunks/__init__.py index 04ed245..590a084 100644 --- a/backend/app/api/v1/chunks/__init__.py +++ b/backend/app/api/v1/chunks/__init__.py @@ -2,6 +2,7 @@ Chunks API Router """ import asyncio +from pathlib import Path from typing import List, Optional from uuid import UUID from pydantic import BaseModel, Field @@ -13,19 +14,28 @@ from app.api.response import ApiResponse, PaginatedResponse from app.core.database import get_db from app.core.exceptions import NotFoundException from app.core.crud import CRUDBase +from app.core.logging import log_success, log_failure from app.models.models import Chunk, File from app.schemas.chunk import ChunkResponse from app.schemas.chunk import ChunkCreateSchema from app.services.text_splitter.splitter import get_splitter -from app.services.file_processor.pdf_processor import process_pdf -from app.services.file_processor.docx_processor import process_docx -from app.services.file_processor.excel_processor import process_csv, process_excel +from markitdown import MarkItDown router = APIRouter() # Initialize CRUD chunk_crud = CRUDBase(Chunk) +# Initialize markitdown +markitdown = MarkItDown() + + +def get_project_ready_dir(project_id: str) -> Path: + """获取项目的 ready 文件目录""" + base_dir = Path("/data/code/YG-Datasets/data") / project_id / "ready" + base_dir.mkdir(parents=True, exist_ok=True) + return base_dir + class SplitRequest(BaseModel): """Request model for splitting text""" @@ -37,28 +47,29 @@ class SplitRequest(BaseModel): async def process_file_by_type(file: File) -> str: - """Process file based on its type""" + """Process file based on its type, convert to markdown""" if not file.file_path: raise NotFoundException("File", file.id) - processors = { - "pdf": process_pdf, - "docx": process_docx, - "xlsx": process_excel, - "csv": process_csv, - } + # Supported types for markitdown + markitdown_types = ["pdf", "docx", "doc", "pptx", "ppt", "xlsx", "xls", "htm", "html"] - processor = processors.get(file.file_type) - if not processor: - # Return raw text for txt, md files + if file.file_type in markitdown_types: + # Use markitdown to convert to markdown loop = asyncio.get_event_loop() - content = await loop.run_in_executor( + result = await loop.run_in_executor( None, - lambda: open(file.file_path, 'r', encoding='utf-8').read() + lambda: markitdown.convert(file.file_path) ) - return content + return result.text_content - return await processor(file.file_path) + # Return raw text for txt, md files + loop = asyncio.get_event_loop() + content = await loop.run_in_executor( + None, + lambda: open(file.file_path, 'r', encoding='utf-8').read() + ) + return content @router.post("/split", response_model=ApiResponse) @@ -68,52 +79,97 @@ async def split_text( db: AsyncSession = Depends(get_db) ): """Split text into chunks""" - # Get file - result = await db.execute( - select(File).where(File.id == request.file_id, File.project_id == project_id) - ) - file = result.scalar_one_or_none() - if not file: - raise NotFoundException("File", request.file_id) - - # Process file - text = await process_file_by_type(file) - - # Update file status - file.status = "processing" - await db.commit() - - # Split text - kwargs = {"chunk_size": request.chunk_size, "overlap": request.overlap} - if request.method == "custom" and request.separator: - kwargs["separator"] = request.separator - - splitter = get_splitter(request.method, **kwargs) - split_results = splitter.split(text) - - # Save chunks - chunks = [] - for chunk_data in split_results: - db_chunk = Chunk( - project_id=project_id, - file_id=file.id, - name=chunk_data.get("name", f"Chunk {chunk_data['index'] + 1}"), - content=chunk_data["content"], - word_count=chunk_data.get("word_count", len(chunk_data["content"].split())) + try: + # Get file + result = await db.execute( + select(File).where(File.id == request.file_id, File.project_id == project_id) ) - db.add(db_chunk) - chunks.append(db_chunk) + file = result.scalar_one_or_none() + if not file: + raise NotFoundException("File", request.file_id) - await db.commit() + # 记录开始处理 + log_success( + "开始处理文件", + project_id=str(project_id), + file_id=str(file.id), + filename=file.filename, + method=request.method, + chunk_size=request.chunk_size, + overlap=request.overlap + ) - # Update file status - file.status = "completed" - await db.commit() + # Process file + text = await process_file_by_type(file) - return ApiResponse.ok( - data={"chunks": len(chunks)}, - message=f"Successfully split into {len(chunks)} chunks" - ) + # Update file status + file.status = "processing" + await db.commit() + + # Split text + kwargs = {"chunk_size": request.chunk_size, "overlap": request.overlap} + if request.method == "custom" and request.separator: + kwargs["separator"] = request.separator + + splitter = get_splitter(request.method, **kwargs) + split_results = splitter.split(text) + + # Save chunks + chunks = [] + for chunk_data in split_results: + db_chunk = Chunk( + project_id=project_id, + file_id=file.id, + name=chunk_data.get("name", f"Chunk {chunk_data['index'] + 1}"), + content=chunk_data["content"], + word_count=chunk_data.get("word_count", len(chunk_data["content"].split())) + ) + db.add(db_chunk) + chunks.append(db_chunk) + + await db.commit() + + # Save processed markdown to ready directory + ready_dir = get_project_ready_dir(str(project_id)) + md_filename = f"{file.id}_{file.filename}.md" + md_path = ready_dir / md_filename + + # Write markdown content to file + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + lambda: md_path.write_text(text, encoding='utf-8') + ) + + # Update file path to ready location + file.file_path = str(md_path) + file.status = "completed" + await db.commit() + + # 记录成功日志 + log_success( + "文件处理完成", + project_id=str(project_id), + file_id=str(file.id), + filename=file.filename, + chunk_count=len(chunks), + text_length=len(text), + ready_path=str(md_path) + ) + + return ApiResponse.ok( + data={"chunks": len(chunks)}, + message=f"Successfully split into {len(chunks)} chunks" + ) + except Exception as e: + # 记录失败日志 + log_failure( + "文件处理失败", + project_id=str(project_id), + file_id=str(request.file_id), + error=str(e) + ) + raise @router.get("", response_model=ApiResponse) diff --git a/backend/app/api/v1/files/__init__.py b/backend/app/api/v1/files/__init__.py index ccf46c7..15de0d6 100644 --- a/backend/app/api/v1/files/__init__.py +++ b/backend/app/api/v1/files/__init__.py @@ -5,7 +5,7 @@ import os import asyncio from pathlib import Path from typing import Optional -from uuid import UUID +from uuid import UUID, uuid4 from fastapi import APIRouter, Depends, UploadFile, File, Query from fastapi.responses import FileResponse from sqlalchemy.ext.asyncio import AsyncSession @@ -15,20 +15,31 @@ from app.core.config import get_settings from app.core.database import get_db from app.core.exceptions import ValidationException, NotFoundException from app.core.crud import CRUDBase +from app.core.logging import log_success, log_failure from app.models.models import File as FileModel from app.schemas.file import FileResponse, FileCreateSchema settings = get_settings() router = APIRouter() -# Ensure upload directory exists -UPLOAD_DIR = Path(settings.UPLOAD_DIR) -UPLOAD_DIR.mkdir(parents=True, exist_ok=True) - # Initialize CRUD file_crud = CRUDBase(FileModel) +def get_project_raw_dir(project_id: str) -> Path: + """获取项目的 raw 文件目录""" + base_dir = Path("/data/code/YG-Datasets/data") / project_id / "raw" + base_dir.mkdir(parents=True, exist_ok=True) + return base_dir + + +def get_project_ready_dir(project_id: str) -> Path: + """获取项目的 ready 文件目录(处理后的文件)""" + base_dir = Path("/data/code/YG-Datasets/data") / project_id / "ready" + base_dir.mkdir(parents=True, exist_ok=True) + return base_dir + + def get_file_type(filename: str) -> str: """Get file type from extension""" ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' @@ -82,40 +93,62 @@ async def upload_file( db: AsyncSession = Depends(get_db) ): """Upload a file""" - # Read file content for validation - content = await file.read() - file_size = len(content) + try: + # Read file content for validation + content = await file.read() + file_size = len(content) - # Validate file - validate_file(file.filename, file_size) + # Validate file + validate_file(file.filename, file_size) - # Save file to disk - safe_filename = f"{project_id}_{UUID.uuid4().hex[:8]}_{file.filename}" - file_path = UPLOAD_DIR / safe_filename + # Save file to disk - 使用项目 raw 目录 + safe_filename = f"{uuid4().hex[:8]}_{file.filename}" + project_dir = get_project_raw_dir(str(project_id)) + file_path = project_dir / safe_filename - # Write file asynchronously - await asyncio.get_event_loop().run_in_executor( - None, - lambda: file_path.write_bytes(content) - ) + # Write file asynchronously + await asyncio.get_event_loop().run_in_executor( + None, + lambda: file_path.write_bytes(content) + ) - # Create file record - db_file = FileModel( - project_id=project_id, - filename=file.filename, - file_type=get_file_type(file.filename), - file_path=str(file_path), - size=file_size, - status="pending" - ) - db.add(db_file) - await db.commit() - await db.refresh(db_file) + # Create file record + db_file = FileModel( + project_id=project_id, + filename=file.filename, + file_type=get_file_type(file.filename), + file_path=str(file_path), + size=file_size, + status="pending" + ) + db.add(db_file) + await db.commit() + await db.refresh(db_file) - return ApiResponse.ok( - data={"id": str(db_file.id), "filename": db_file.filename, "status": db_file.status}, - message="File uploaded successfully" - ) + # 记录成功日志 + log_success( + "文件上传成功", + project_id=str(project_id), + file_id=str(db_file.id), + filename=file.filename, + file_type=db_file.file_type, + file_size=file_size, + file_path=str(file_path) + ) + + return ApiResponse.ok( + data={"id": str(db_file.id), "filename": db_file.filename, "status": db_file.status}, + message="File uploaded successfully" + ) + except Exception as e: + # 记录失败日志 + log_failure( + "文件上传失败", + project_id=str(project_id), + filename=file.filename if 'file' in locals() else "unknown", + error=str(e) + ) + raise @router.get("", response_model=ApiResponse) diff --git a/backend/app/core/logging.py b/backend/app/core/logging.py index f4e737e..89362a8 100644 --- a/backend/app/core/logging.py +++ b/backend/app/core/logging.py @@ -4,8 +4,9 @@ Logging Configuration """ import logging import sys +from datetime import datetime from typing import Any -from logging.handlers import RotatingFileHandler +from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler from pathlib import Path from app.core.config import get_settings @@ -15,6 +16,18 @@ settings = get_settings() LOG_DIR = Path("./logs") LOG_DIR.mkdir(exist_ok=True) +# 日期格式 +LOG_DATE = datetime.now().strftime("%Y-%m-%d") + +# 当天的日志目录 +CURRENT_LOG_DIR = LOG_DIR / LOG_DATE +CURRENT_LOG_DIR.mkdir(exist_ok=True) + + +def get_log_path(filename: str) -> Path: + """获取当天的日志文件路径""" + return CURRENT_LOG_DIR / filename + def setup_logging(name: str = "yg_dataset") -> logging.Logger: """Setup application logging""" @@ -35,20 +48,21 @@ def setup_logging(name: str = "yg_dataset") -> logging.Logger: console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) - # File handler - file_handler = RotatingFileHandler( - LOG_DIR / f"{name}.log", - maxBytes=10 * 1024 * 1024, # 10MB - backupCount=5, + # Main log file handler - app.log + main_file_handler = TimedRotatingFileHandler( + get_log_path("app.log"), + when="midnight", + interval=1, + backupCount=30, encoding="utf-8" ) - file_handler.setLevel(logging.INFO) - file_formatter = logging.Formatter( + main_file_handler.setLevel(logging.INFO) + main_file_formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) - file_handler.setFormatter(file_formatter) - logger.addHandler(file_handler) + main_file_handler.setFormatter(main_file_formatter) + logger.addHandler(main_file_handler) return logger @@ -57,6 +71,65 @@ def setup_logging(name: str = "yg_dataset") -> logging.Logger: logger = setup_logging() +# ============== Success Logger ============== +def get_success_logger() -> logging.Logger: + """获取成功日志记录器""" + success_logger = logging.getLogger("yg_dataset.success") + if not success_logger.handlers: + handler = RotatingFileHandler( + get_log_path("success.log"), + maxBytes=10 * 1024 * 1024, + backupCount=30, + encoding="utf-8" + ) + handler.setLevel(logging.INFO) + formatter = logging.Formatter( + fmt="%(asctime)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + handler.setFormatter(formatter) + success_logger.addHandler(handler) + success_logger.setLevel(logging.INFO) + return success_logger + + +# ============== Failure Logger ============== +def get_failure_logger() -> logging.Logger: + """获取失败日志记录器""" + failure_logger = logging.getLogger("yg_dataset.failure") + if not failure_logger.handlers: + handler = RotatingFileHandler( + get_log_path("failure.log"), + maxBytes=10 * 1024 * 1024, + backupCount=30, + encoding="utf-8" + ) + handler.setLevel(logging.WARNING) + formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + handler.setFormatter(formatter) + failure_logger.addHandler(handler) + failure_logger.setLevel(logging.WARNING) + return failure_logger + + +# ============== Convenience functions ============== +def log_success(message: str, **kwargs): + """记录成功日志""" + extra_info = " | ".join([f"{k}={v}" for k, v in kwargs.items()]) if kwargs else "" + full_message = f"{message} | {extra_info}" if extra_info else message + get_success_logger().info(full_message) + + +def log_failure(message: str, **kwargs): + """记录失败日志""" + extra_info = " | ".join([f"{k}={v}" for k, v in kwargs.items()]) if kwargs else "" + full_message = f"{message} | {extra_info}" if extra_info else message + get_failure_logger().warning(full_message) + + class LoggerMixin: """Mixin to add logging capability to classes""" diff --git a/backend/app/main.py b/backend/app/main.py index 7310ba7..e3f2cf1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -107,7 +107,7 @@ async def app_exception_handler(request: Request, exc: AppException): content=ApiResponse.fail( message=exc.message, error={"code": exc.code, "details": exc.details} - ).model_dump() + ).model_dump(mode='json') ) @@ -127,7 +127,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE content=ApiResponse.fail( message="Validation error", error={"code": "VALIDATION_ERROR", "details": {"errors": errors}} - ).model_dump() + ).model_dump(mode='json') ) @@ -140,7 +140,7 @@ async def database_exception_handler(request: Request, exc: SQLAlchemyError): content=ApiResponse.fail( message="Database operation failed", error={"code": "DATABASE_ERROR"} - ).model_dump() + ).model_dump(mode='json') ) @@ -153,7 +153,7 @@ async def general_exception_handler(request: Request, exc: Exception): content=ApiResponse.fail( message="Internal server error", error={"code": "INTERNAL_ERROR"} - ).model_dump() + ).model_dump(mode='json') )