# /home/ram/aparsoft/backend/apps/accounts/api/views/auth_views.py """ Enhanced Authentication Views for Aparsoft This module provides comprehensive authentication functionality including: - Role-based login with appropriate user context - Enhanced registration with automatic profile creation - User validation - Secure cookie-based session management - Profile completion workflows - Administrative user creation Key Features: 1. Automatic profile creation after registration based on user role 2. Enhanced security with proper error handling 3. Role-specific dashboard redirection after login 4. Integration with Aparsoft workflow """ from rest_framework import status from rest_framework.response import Response from django.utils import timezone from django.conf import settings from rest_framework import serializers from rest_framework.exceptions import AuthenticationFailed, ValidationError from rest_framework.permissions import IsAuthenticated, AllowAny from rest_framework.views import APIView from rest_framework_simplejwt.views import TokenObtainPairView from rest_framework_simplejwt.token_blacklist.models import OutstandingToken from django.db import transaction from rest_framework_simplejwt.tokens import RefreshToken from django.contrib.auth import get_user_model from django.contrib.auth.tokens import default_token_generator from django.utils.decorators import method_decorator from django.views.decorators.csrf import ensure_csrf_cookie, csrf_protect from django.http import JsonResponse from django.shortcuts import get_object_or_404 from django.utils.http import urlsafe_base64_decode, urlsafe_base64_encode from django.utils.encoding import force_str, force_bytes import logging from rest_framework.throttling import AnonRateThrottle from typing import Dict, Any from decouple import config # Import enhanced serializers from ..serializers import ( CustomTokenObtainPairSerializer, RegisterSerializer, ) # Import models from Aparsoft accounts from ...models import ( CustomUser, UserContact, DeveloperProfile, ClientProfile, ProjectManagerProfile, AccountManagerProfile, Team, ) # Import permissions from core.permissions import BaseAccessControl logger = logging.getLogger(__name__) User = get_user_model() @method_decorator([ensure_csrf_cookie], name="dispatch") class CustomTokenObtainPairView(TokenObtainPairView): """ Enhanced custom token view for Aparsoft. Features: - Role-based authentication with appropriate context - Automatic profile validation and creation - Enhanced security and error handling - Dashboard routing based on user role """ serializer_class = CustomTokenObtainPairSerializer def post(self, request, *args, **kwargs): # Request validation if not request.data.get("email") or not request.data.get("password"): return Response( { "message": "Email and password are required.", "code": "required_fields_missing", "status": "error", }, status=status.HTTP_400_BAD_REQUEST, ) # Pre-validate user exists and is active try: user = User.objects.get(email=request.data.get("email")) if not user.is_active: return Response( { "message": "Account is inactive. Please contact support.", "code": "account_inactive", "status": "error", }, status=status.HTTP_403_FORBIDDEN, ) # Check role_status for suspended/blocked/inactive users if hasattr(user, "role_status") and user.role_status in [ "suspended", "inactive", "blocked", ]: status_messages = { "suspended": "Account is suspended. Please contact support.", "inactive": "Account is inactive. Please contact support.", "blocked": "Account is blocked. Please contact support.", } return Response( { "message": status_messages.get( user.role_status, "Account access is restricted. Please contact support.", ), "code": f"account_{user.role_status}", "status": "error", }, status=status.HTTP_403_FORBIDDEN, ) except User.DoesNotExist: # Don't reveal that the user doesn't exist pass try: # Use enhanced serializer with role-specific context serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): errors = serializer.errors # Handle specific authentication errors if "message" in errors: error_message = str(errors["message"]) if "No active account found" in error_message: return Response( { "message": "Invalid email or password.", "code": "invalid_credentials", "status": "error", }, status=status.HTTP_401_UNAUTHORIZED, ) elif "profile_not_found" in error_message: return Response( { "message": "User profile incomplete. Please contact support.", "code": "profile_incomplete", "status": "error", }, status=status.HTTP_400_BAD_REQUEST, ) if "non_field_errors" in errors: return Response( { "message": "Authentication failed. Please check your credentials.", "code": "authentication_failed", "status": "error", }, status=status.HTTP_401_UNAUTHORIZED, ) return Response( { "message": "Login validation failed", "code": "validation_error", "status": "error", "errors": errors, }, status=status.HTTP_400_BAD_REQUEST, ) # Extract validated data and user data = serializer.validated_data user = serializer.user # DEBUG: Log user role information logger.info(f"User login - Email: {user.email}, Role: {user.role}, is_client: {user.is_client}") # AUTO-CREATE DocAPIClient if user is a client and doesn't have one if user.is_client: from documentintelligence.models import DocAPIClient from documentintelligence.services.docapi_initializer_service import get_docapi_initializer_service # Check if client record exists client_exists = DocAPIClient.objects.filter(user=user).exists() logger.info(f"DocAPIClient exists for {user.email}: {client_exists}") if not client_exists: logger.info(f"Auto-creating DocAPIClient for user: {user.email}") try: service = get_docapi_initializer_service() # Sanitize username for client_name import re sanitized_username = re.sub(r'[^a-zA-Z0-9_]', '_', user.username) client_name = f"{sanitized_username}_{user.id}" result = service.create_client( company_name=user.get_full_name() or user.email or "Individual", client_name=client_name, user_identifier=user, plan_code="free", # Start with free plan description=f"Auto-created for {user.email}", auto_generate_key=True, activate_immediately=True, ) if result.success: # CRITICAL: Save client and API key in transaction with transaction.atomic(): result.client.save() if result.api_key: result.api_key.save() logger.info(f"✓ API Key created: {result.api_key.key_prefix}...") if result.webhook: result.webhook.save() logger.info(f"✓ DocAPIClient created successfully for {user.email}") logger.info(f" - Client ID: {result.client.id}") logger.info(f" - Client Name: {result.client.client_name}") logger.info(f" - Status: {result.client.status}") logger.info(f" - API Keys: {result.client.api_keys.count()}") else: logger.error(f"Failed to create DocAPIClient: {result.message}") except Exception as e: logger.error(f"Exception during DocAPIClient creation: {str(e)}", exc_info=True) else: logger.info(f"User {user.email} is not a client (role: {user.role}), skipping DocAPIClient creation") # Update user's login count and last active user.login_count += 1 user.last_active = timezone.now() user.save(update_fields=["login_count", "last_active"]) # Enhanced user data with role-specific context enhanced_user_data = self._get_enhanced_login_response(user, data) # Create response with enhanced data response = Response( { "message": "Login successful", "status": "success", "data": enhanced_user_data, }, status=status.HTTP_200_OK, ) # Set secure HTTP-only cookies self._set_auth_cookies(response, data) logger.info(f"Successful login for {user.role}: {user.email}") return response except AuthenticationFailed as auth_error: logger.debug(f"Authentication failed: {str(auth_error)}") return Response( { "message": "Invalid email or password.", "code": "invalid_credentials", "status": "error", }, status=status.HTTP_401_UNAUTHORIZED, ) except ValidationError as validation_error: logger.info(f"Validation error: {str(validation_error)}") return Response( { "message": str(validation_error), "code": "validation_error", "status": "error", }, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error("Unexpected login error:", exc_info=True) return Response( { "message": "An error occurred during login. Please try again.", "code": "server_error", "status": "error", }, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) def _get_enhanced_login_response( self, user, token_data: Dict[str, Any] ) -> Dict[str, Any]: """Create enhanced login response with role-specific context.""" # Base user data from serializer user_data = token_data.get("user", {}) # Add organization context if applicable organization_data = None if user.client_organization: organization_data = { "id": user.client_organization.id, "name": user.client_organization.name, "organization_type": user.client_organization.organization_type, "subscription_tier": user.subscription_tier, } # Add role-specific data and dashboard routing role_data = {} dashboard_route = "/dashboard" if user.is_developer and hasattr(user, "developer_profile"): developer = user.developer_profile role_data = { "experience_level": developer.experience_level, "technical_expertise": developer.get_skills_summary(), "availability": developer.is_available, "utilization_rate": developer.utilization_rate, "team": developer.team.name if developer.team else None, } dashboard_route = "/platform/developer" elif user.is_client and hasattr(user, "client_profile"): client = user.client_profile role_data = { "client_type": client.client_type, "client_status": client.client_status, "industry_sector": client.industry_sector, "active_projects_count": client.active_projects_count, "account_manager": ( client.account_manager.full_name if client.account_manager else None ), "subscription_tier": user.subscription_tier, "onboarding_complete": client.client_status == "active", } dashboard_route = "/platform/client" elif user.is_project_manager and hasattr(user, "project_manager_profile"): pm = user.project_manager_profile role_data = { "experience_level": pm.experience_level, "primary_methodology": pm.primary_methodology, "active_projects_count": pm.active_projects_count, "utilization_percentage": pm.utilization_percentage, } dashboard_route = "/dashboard/project-manager" elif user.is_account_manager and hasattr(user, "account_manager_profile"): am = user.account_manager_profile role_data = { "experience_level": am.experience_level, "sales_focus": am.sales_focus, "active_clients_count": am.active_clients_count, "client_satisfaction_score": float(am.client_satisfaction_score), "pipeline_value": float(am.pipeline_value), } dashboard_route = "/dashboard/account-manager" elif user.role == "admin": role_data = { "admin_permissions": [ "manage_users", "view_system_analytics", "manage_settings", "billing_management", "user_creation", "system_configuration", "view_all_projects", "view_all_clients", ], "system_access": "full", "managed_resources": [ "users", "projects", "clients", "billing", "system", ], } dashboard_route = "/platform/admin" # Get user permissions permissions = self._get_user_permissions(user) # Check for profile completion requirements profile_completion = self._check_profile_completion(user) return { "tokens": { "access": token_data["access"], "refresh": token_data["refresh"], }, "user": { **user_data, "role_data": role_data, "permissions": permissions, "profile_completion": profile_completion, }, "organization": organization_data, "navigation": { "dashboard_route": dashboard_route, "next_action": self._get_next_action(user), }, "session_info": { "login_count": user.login_count, "last_login": user.last_login.isoformat() if user.last_login else None, "session_expires": ( timezone.now() + timezone.timedelta(hours=1) ).isoformat(), }, } def _set_auth_cookies(self, response: Response, token_data: Dict[str, Any]) -> None: """Set secure HTTP-only authentication cookies.""" cookie_settings = { "httponly": True, "samesite": "Lax", "secure": not settings.DEBUG, "path": "/", } # Set refresh token (7 days) response.set_cookie( "refresh_token", token_data["refresh"], max_age=7 * 24 * 60 * 60, **cookie_settings, ) # Set access token (1 hour) response.set_cookie( "access_token", token_data["access"], max_age=60 * 60, **cookie_settings ) # Set auth state (readable by JS) response.set_cookie( "auth_state", "authenticated", httponly=False, max_age=7 * 24 * 60 * 60, **{k: v for k, v in cookie_settings.items() if k != "httponly"}, ) def _get_user_permissions(self, user) -> list: """Get user permissions based on role and context.""" base_permissions = ["view_profile", "update_profile"] if user.is_developer: base_permissions.extend( [ "view_assigned_tasks", "update_task_status", "view_project_details", "track_time", "submit_code", "view_team_members", ] ) # Senior developers get additional permissions if user.is_senior_developer: base_permissions.extend( ["review_code", "assign_tasks", "view_project_analytics"] ) elif user.is_client: base_permissions.extend( [ "view_projects", "view_project_status", "create_support_tickets", "approve_deliverables", "access_resources", "view_invoices", ] ) # Business and enterprise clients get additional permissions if user.is_business or user.is_enterprise: base_permissions.extend( ["view_detailed_analytics", "api_access", "priority_support"] ) elif user.is_project_manager: base_permissions.extend( [ "create_projects", "manage_team", "assign_tasks", "view_analytics", "generate_reports", "manage_client_communication", "update_project_status", "manage_resources", ] ) elif user.is_account_manager: base_permissions.extend( [ "manage_clients", "view_sales_pipeline", "create_opportunities", "view_client_analytics", "generate_quotes", "manage_contracts", ] ) elif user.role == "admin": base_permissions.extend( [ "manage_users", "view_system_analytics", "manage_settings", "billing_management", "user_creation", "system_configuration", "view_all_projects", "view_all_clients", ] ) elif user.is_superuser: base_permissions.append("full_access") return base_permissions def _check_profile_completion(self, user) -> Dict[str, Any]: """Check if user profile is complete and what steps are needed.""" completion_status = { "is_complete": True, "missing_fields": [], "next_steps": [], } # Check basic profile fields if not user.first_name or not user.last_name: completion_status["is_complete"] = False completion_status["missing_fields"].append("name") completion_status["next_steps"].append("Complete your name") # Check email verification if not user.email_verified: completion_status["is_complete"] = False completion_status["missing_fields"].append("email_verification") completion_status["next_steps"].append("Verify your email address") # Check contact information try: contact = user.contact if not contact.country or not contact.city: completion_status["is_complete"] = False completion_status["missing_fields"].append("location") completion_status["next_steps"].append("Add your location") except UserContact.DoesNotExist: completion_status["is_complete"] = False completion_status["missing_fields"].append("contact") completion_status["next_steps"].append("Complete contact information") # Role-specific checks if user.is_developer and hasattr(user, "developer_profile"): developer = user.developer_profile if not developer.technical_expertise: completion_status["missing_fields"].append("technical_skills") completion_status["next_steps"].append("Add your technical skills") completion_status["is_complete"] = False if not developer.programming_languages: completion_status["missing_fields"].append("programming_languages") completion_status["next_steps"].append("Add your programming languages") completion_status["is_complete"] = False elif user.is_client and hasattr(user, "client_profile"): client = user.client_profile if client.client_status == "onboarding": completion_status["next_steps"].append("Complete onboarding process") completion_status["is_complete"] = False if not client.industry_sector: completion_status["missing_fields"].append("industry_sector") completion_status["next_steps"].append("Set your industry sector") completion_status["is_complete"] = False elif user.is_project_manager and hasattr(user, "project_manager_profile"): pm = user.project_manager_profile if not pm.methodologies: completion_status["missing_fields"].append("methodologies") completion_status["next_steps"].append( "Add your project management methodologies" ) completion_status["is_complete"] = False if not pm.domain_expertise: completion_status["missing_fields"].append("domain_expertise") completion_status["next_steps"].append("Add your domain expertise") completion_status["is_complete"] = False elif user.is_account_manager and hasattr(user, "account_manager_profile"): am = user.account_manager_profile if not am.industry_specializations: completion_status["missing_fields"].append("industry_specializations") completion_status["next_steps"].append( "Add your industry specializations" ) completion_status["is_complete"] = False if not am.solution_expertise: completion_status["missing_fields"].append("solution_expertise") completion_status["next_steps"].append("Add your solution expertise") completion_status["is_complete"] = False return completion_status def _get_next_action(self, user) -> str: """Determine the next recommended action for the user.""" if not user.email_verified: return "verify_email" if user.is_developer: if hasattr(user, "developer_profile"): developer = user.developer_profile if not developer.team: return "join_team" elif ( developer.project_history is None or len(developer.project_history) == 0 ): return "view_available_projects" return "view_tasks" elif user.is_client: if hasattr(user, "client_profile"): client = user.client_profile if client.client_status == "onboarding": return "complete_onboarding" elif client.active_projects_count == 0: return "explore_services" return "view_projects" elif user.is_project_manager: if hasattr(user, "project_manager_profile"): pm = user.project_manager_profile if pm.active_projects_count == 0: return "create_project" return "manage_projects" elif user.is_account_manager: if hasattr(user, "account_manager_profile"): am = user.account_manager_profile if am.active_clients_count == 0: return "add_clients" elif am.opportunities is None or len(am.opportunities) == 0: return "create_opportunity" return "manage_clients" elif user.role == "admin": return "system_overview" return "complete_profile" class LogoutView(APIView): """ Enhanced logout view with comprehensive session cleanup. Features: - Graceful handling of expired/invalid tokens - Complete cookie cleanup - Session blacklisting - Multi-device logout support """ permission_classes = [AllowAny] def post(self, request): try: refresh_token = request.data.get("refresh") or request.data.get( "refresh_token" ) all_devices = request.data.get("all_devices", False) # Handle logout with invalid/expired token if request.auth is None and request.user is None: logger.info("Processing logout with expired token") return self._create_logout_response( message="Session expired, cookies cleared", code="logout_expired_token", ) # Handle logout without refresh token if not refresh_token: logger.warning("No refresh token provided in logout") if hasattr(request, "user") and request.user.is_authenticated: request.user.update_last_active() return self._create_logout_response( message="Logged out successfully", code="logout_without_token" ) # Validate and blacklist token try: token = RefreshToken(refresh_token) # Verify token belongs to current user if ( hasattr(request, "user") and request.user.is_authenticated and token.payload.get("user_id") != request.user.id ): logger.warning(f"Token user mismatch during logout") return self._create_logout_response( message="Invalid token, but logged out", code="token_mismatch" ) # Blacklist the token token.blacklist() # Update user's last active if hasattr(request, "user") and request.user.is_authenticated: request.user.update_last_active() # Handle multi-device logout if ( all_devices and hasattr(request, "user") and request.user.is_authenticated ): logger.info(f"Logging out all devices for user: {request.user.id}") OutstandingToken.objects.filter(user=request.user).delete() logger.info( f"Successful logout for user: {getattr(request.user, 'email', 'Unknown')}" ) return self._create_logout_response( message="Successfully logged out", code="logout_success" ) except Exception as token_error: logger.warning( f"Token validation error during logout: {str(token_error)}" ) return self._create_logout_response( message="Invalid token, but logged out", code="invalid_token" ) except Exception as e: logger.error(f"Unexpected logout error: {str(e)}", exc_info=True) return self._create_logout_response( message="Error occurred, but logged out", code="error_but_logged_out" ) def _create_logout_response(self, message: str, code: str) -> Response: """Create logout response with cookie cleanup.""" response = Response( {"message": message, "code": code, "status": "success"}, status=status.HTTP_200_OK, ) # Clear all authentication cookies cookies_to_clear = ["auth_state", "access_token", "refresh_token", "csrftoken"] for cookie in cookies_to_clear: response.delete_cookie(cookie, path="/") return response