""" Document Processing Service Handles file upload processing, text extraction, chunking, and embedding creation. Designed to be called from Celery tasks for asynchronous processing. Usage: from apps.chatbot.services import DocumentProcessingService # Process uploaded document DocumentProcessingService.process_document( document_id=doc.id, user_id=user.id ) """ from typing import List, Dict, Any, Optional from uuid import UUID import mimetypes import os from django.core.files.uploadedfile import UploadedFile from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import ( PyPDFLoader, TextLoader, Docx2txtLoader, UnstructuredMarkdownLoader ) from apps.chatbot.models import UserDocument from apps.accounts.models import CustomUser from apps.chatbot.services.vector_storage_service import VectorStorageService class DocumentProcessingService: """Service for processing uploaded documents.""" # Supported file types SUPPORTED_TYPES = { 'application/pdf': 'pdf', 'text/plain': 'txt', 'text/markdown': 'md', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', 'application/msword': 'doc' } @staticmethod def create_document_record( user: CustomUser, uploaded_file: UploadedFile, chat_session_id: Optional[UUID] = None, metadata: Optional[Dict[str, Any]] = None ) -> UserDocument: """ Create initial document record from uploaded file. Args: user: User uploading the file uploaded_file: Django UploadedFile object chat_session_id: Optional associated chat session metadata: Additional metadata Returns: Created UserDocument instance Example: doc = DocumentProcessingService.create_document_record( user=request.user, uploaded_file=request.FILES['document'], chat_session_id=session.id ) # Then trigger async processing process_document_task.delay(doc.id, user.id) """ # Validate file type file_type = uploaded_file.content_type if file_type not in DocumentProcessingService.SUPPORTED_TYPES: raise ValueError( f"Unsupported file type: {file_type}. " f"Supported: {', '.join(DocumentProcessingService.SUPPORTED_TYPES.values())}" ) # Create document record doc = UserDocument.objects.create( user=user, chat_session_id=chat_session_id, file=uploaded_file, file_name=uploaded_file.name, file_size=uploaded_file.size, file_type=file_type, processing_status='pending', metadata=metadata or {} ) return doc @staticmethod def load_document_text(file_path: str, file_type: str) -> str: """ Extract text from document based on file type. Args: file_path: Path to the file file_type: MIME type of the file Returns: Extracted text content Raises: ValueError: If file type not supported or loading fails """ try: # PDF if file_type == 'application/pdf': loader = PyPDFLoader(file_path) pages = loader.load() return "\n\n".join([page.page_content for page in pages]) # Plain text elif file_type == 'text/plain': loader = TextLoader(file_path) docs = loader.load() return docs[0].page_content # Markdown elif file_type == 'text/markdown': loader = UnstructuredMarkdownLoader(file_path) docs = loader.load() return docs[0].page_content # Word documents elif file_type in [ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/msword' ]: loader = Docx2txtLoader(file_path) docs = loader.load() return docs[0].page_content else: raise ValueError(f"Unsupported file type: {file_type}") except Exception as e: raise ValueError(f"Failed to load document: {str(e)}") @staticmethod def chunk_text( text: str, chunk_size: int = 1000, chunk_overlap: int = 200, separators: Optional[List[str]] = None ) -> List[str]: """ Split text into chunks for embedding. Args: text: Text to chunk chunk_size: Size of each chunk (characters) chunk_overlap: Overlap between chunks separators: Custom separators (optional) Returns: List of text chunks Example: chunks = DocumentProcessingService.chunk_text( text=document_text, chunk_size=1000, chunk_overlap=200 ) """ # Default separators for better semantic chunking default_separators = [ "\n\n", # Paragraphs "\n", # Lines ". ", # Sentences ", ", # Clauses " ", # Words "" # Characters ] splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=separators or default_separators, length_function=len ) chunks = splitter.split_text(text) return chunks @staticmethod def process_document( document_id: UUID, user_id: UUID, chunk_size: int = 1000, chunk_overlap: int = 200 ) -> UserDocument: """ Complete document processing pipeline. This is typically called from a Celery task for async processing. Args: document_id: UserDocument ID user_id: User ID (for permission check) chunk_size: Size of text chunks chunk_overlap: Overlap between chunks Returns: Updated UserDocument instance Raises: Exception: If processing fails (stored in error_message) Example: # In Celery task @shared_task def process_document_task(document_id, user_id): return DocumentProcessingService.process_document( document_id=document_id, user_id=user_id ) """ # Get document and user doc = UserDocument.objects.get(id=document_id, user_id=user_id) user = CustomUser.objects.get(id=user_id) try: # Update status doc.processing_status = 'processing' doc.save() # Get file path file_path = doc.file.path # Extract text text = DocumentProcessingService.load_document_text( file_path=file_path, file_type=doc.file_type ) # Chunk text chunks = DocumentProcessingService.chunk_text( text=text, chunk_size=chunk_size, chunk_overlap=chunk_overlap ) # Store embeddings vector_ids = VectorStorageService.store_document_embeddings( document=doc, chunks=chunks, user=user ) # Document is already marked as completed in store_document_embeddings return doc except Exception as e: # Mark as failed doc.processing_status = 'failed' doc.error_message = str(e) doc.save() raise @staticmethod def reprocess_document( document_id: UUID, user_id: UUID, chunk_size: Optional[int] = None, chunk_overlap: Optional[int] = None ) -> UserDocument: """ Reprocess a document with different settings. Args: document_id: UserDocument ID user_id: User ID chunk_size: New chunk size (optional) chunk_overlap: New overlap (optional) Returns: Updated UserDocument instance Example: # Reprocess with larger chunks doc = DocumentProcessingService.reprocess_document( document_id=doc.id, user_id=user.id, chunk_size=2000 ) """ doc = UserDocument.objects.get(id=document_id, user_id=user_id) user = CustomUser.objects.get(id=user_id) # Extract text file_path = doc.file.path text = DocumentProcessingService.load_document_text( file_path=file_path, file_type=doc.file_type ) # Chunk with new settings chunks = DocumentProcessingService.chunk_text( text=text, chunk_size=chunk_size or 1000, chunk_overlap=chunk_overlap or 200 ) # Reindex VectorStorageService.reindex_document( document=doc, new_chunks=chunks, user=user ) return doc @staticmethod def get_processing_status(document_id: UUID) -> Dict[str, Any]: """ Get processing status for a document. Args: document_id: UserDocument ID Returns: Status information dict Example: status = DocumentProcessingService.get_processing_status(doc.id) if status['status'] == 'completed': print(f"Created {status['chunk_count']} chunks") """ doc = UserDocument.objects.get(id=document_id) return { 'document_id': str(doc.id), 'file_name': doc.file_name, 'status': doc.processing_status, 'chunk_count': doc.chunk_count, 'has_embeddings': doc.has_embeddings, 'error_message': doc.error_message, 'created_at': doc.created_at, 'processed_at': doc.processed_at } @staticmethod def delete_document( document_id: UUID, user_id: UUID, delete_file: bool = True ) -> None: """ Delete document and its embeddings. Args: document_id: UserDocument ID user_id: User ID (for permission check) delete_file: Also delete the file from storage Example: DocumentProcessingService.delete_document( document_id=doc.id, user_id=user.id, delete_file=True ) """ doc = UserDocument.objects.get(id=document_id, user_id=user_id) # Delete embeddings first if doc.has_embeddings: VectorStorageService.delete_document_embeddings(doc) # Delete file if requested if delete_file and doc.file: file_path = doc.file.path if os.path.exists(file_path): os.remove(file_path) # Delete database record doc.delete() @staticmethod def get_document_preview( document_id: UUID, max_length: int = 500 ) -> str: """ Get preview of document content. Args: document_id: UserDocument ID max_length: Max characters to return Returns: Preview text Example: preview = DocumentProcessingService.get_document_preview( document_id=doc.id, max_length=200 ) """ doc = UserDocument.objects.get(id=document_id) if not doc.file: return "" try: text = DocumentProcessingService.load_document_text( file_path=doc.file.path, file_type=doc.file_type ) if len(text) > max_length: return text[:max_length] + "..." return text except: return "Preview not available" @staticmethod def validate_file( uploaded_file: UploadedFile, max_size_mb: int = 10 ) -> Dict[str, Any]: """ Validate uploaded file before processing. Args: uploaded_file: Django UploadedFile object max_size_mb: Maximum file size in MB Returns: Validation result dict Example: result = DocumentProcessingService.validate_file( uploaded_file=request.FILES['document'], max_size_mb=10 ) if not result['valid']: return Response(result, status=400) """ errors = [] # Check file type file_type = uploaded_file.content_type if file_type not in DocumentProcessingService.SUPPORTED_TYPES: errors.append( f"Unsupported file type: {file_type}" ) # Check file size max_bytes = max_size_mb * 1024 * 1024 if uploaded_file.size > max_bytes: errors.append( f"File too large: {uploaded_file.size / (1024*1024):.2f}MB. " f"Maximum: {max_size_mb}MB" ) # Check file name if not uploaded_file.name: errors.append("File name is required") return { 'valid': len(errors) == 0, 'errors': errors, 'file_name': uploaded_file.name, 'file_type': file_type, 'file_size_mb': uploaded_file.size / (1024 * 1024) }