""" Chunks API Router """ import asyncio from typing import List, Optional from uuid import UUID from pydantic import BaseModel, Field from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.api.response import ApiResponse, PaginatedResponse from app.core.database import get_db from app.core.exceptions import NotFoundException from app.core.crud import CRUDBase from app.models.models import Chunk, File from app.schemas.chunk import ChunkResponse from app.schemas.chunk import ChunkCreateSchema from app.services.text_splitter.splitter import get_splitter from app.services.file_processor.pdf_processor import process_pdf from app.services.file_processor.docx_processor import process_docx from app.services.file_processor.excel_processor import process_csv, process_excel router = APIRouter() # Initialize CRUD chunk_crud = CRUDBase(Chunk) class SplitRequest(BaseModel): """Request model for splitting text""" file_id: UUID method: str = "recursive" chunk_size: int = Field(500, ge=50, le=5000) overlap: int = Field(50, ge=0, le=500) separator: Optional[str] = None async def process_file_by_type(file: File) -> str: """Process file based on its type""" if not file.file_path: raise NotFoundException("File", file.id) processors = { "pdf": process_pdf, "docx": process_docx, "xlsx": process_excel, "csv": process_csv, } processor = processors.get(file.file_type) if not processor: # Return raw text for txt, md files loop = asyncio.get_event_loop() content = await loop.run_in_executor( None, lambda: open(file.file_path, 'r', encoding='utf-8').read() ) return content return await processor(file.file_path) @router.post("/split", response_model=ApiResponse) async def split_text( project_id: UUID, request: SplitRequest, db: AsyncSession = Depends(get_db) ): """Split text into chunks""" # Get file result = await db.execute( select(File).where(File.id == request.file_id, File.project_id == project_id) ) file = result.scalar_one_or_none() if not file: raise NotFoundException("File", request.file_id) # Process file text = await process_file_by_type(file) # Update file status file.status = "processing" await db.commit() # Split text kwargs = {"chunk_size": request.chunk_size, "overlap": request.overlap} if request.method == "custom" and request.separator: kwargs["separator"] = request.separator splitter = get_splitter(request.method, **kwargs) split_results = splitter.split(text) # Save chunks chunks = [] for chunk_data in split_results: db_chunk = Chunk( project_id=project_id, file_id=file.id, name=chunk_data.get("name", f"Chunk {chunk_data['index'] + 1}"), content=chunk_data["content"], word_count=chunk_data.get("word_count", len(chunk_data["content"].split())) ) db.add(db_chunk) chunks.append(db_chunk) await db.commit() # Update file status file.status = "completed" await db.commit() return ApiResponse.ok( data={"chunks": len(chunks)}, message=f"Successfully split into {len(chunks)} chunks" ) @router.get("", response_model=ApiResponse) async def list_chunks( project_id: UUID, file_id: Optional[UUID] = Query(None), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db) ): """List chunks for a project""" filters = {"project_id": project_id} if file_id: filters["file_id"] = file_id skip = (page - 1) * page_size chunks, total = await chunk_crud.get_multi( db, skip=skip, limit=page_size, filters=filters, order_by="created_at", descending=True ) chunk_responses = [ChunkResponse.model_validate(c) for c in chunks] return PaginatedResponse.ok( items=chunk_responses, page=page, page_size=page_size, total=total ) @router.get("/{chunk_id}", response_model=ApiResponse) async def get_chunk( project_id: UUID, chunk_id: UUID, db: AsyncSession = Depends(get_db) ): """Get chunk by ID""" chunk = await chunk_crud.get(db, chunk_id) if not chunk or chunk.project_id != project_id: raise NotFoundException("Chunk", chunk_id) return ApiResponse.ok(data=ChunkResponse.model_validate(chunk)) @router.put("/{chunk_id}", response_model=ApiResponse) async def update_chunk( project_id: UUID, chunk_id: UUID, chunk: ChunkCreateSchema, db: AsyncSession = Depends(get_db) ): """Update chunk""" db_chunk = await chunk_crud.get(db, chunk_id) if not db_chunk or db_chunk.project_id != project_id: raise NotFoundException("Chunk", chunk_id) updated_chunk = await chunk_crud.update(db, db_chunk, chunk) return ApiResponse.ok( data=ChunkResponse.model_validate(updated_chunk), message="Chunk updated successfully" ) @router.delete("/{chunk_id}", response_model=ApiResponse) async def delete_chunk( project_id: UUID, chunk_id: UUID, db: AsyncSession = Depends(get_db) ): """Delete chunk""" chunk = await chunk_crud.get(db, chunk_id) if not chunk or chunk.project_id != project_id: raise NotFoundException("Chunk", chunk_id) await chunk_crud.delete(db, chunk_id) return ApiResponse.ok(message="Chunk deleted successfully")