""" JWT аутентификация для WebSocket соединений. """ import logging from urllib.parse import parse_qs from django.contrib.auth.models import AnonymousUser from channels.middleware import BaseMiddleware from channels.auth import AuthMiddlewareStack from rest_framework_simplejwt.tokens import UntypedToken from rest_framework_simplejwt.exceptions import InvalidToken, TokenError from jwt import decode as jwt_decode from django.conf import settings from django.contrib.auth import get_user_model logger = logging.getLogger(__name__) User = get_user_model() class JWTAuthMiddleware(BaseMiddleware): """ Middleware для аутентификации WebSocket через JWT токены. Извлекает токен из query параметров или заголовков. """ async def __call__(self, scope, receive, send): try: # Извлечь токен из query параметров query_string = scope.get('query_string', b'').decode() query_params = parse_qs(query_string) token = query_params.get('token', [None])[0] # Логируем для отладки path = scope.get('path', '') logger.info(f'[JWTAuthMiddleware] WebSocket connection attempt: path={path}, query_string={query_string[:100] if len(query_string) > 100 else query_string}, token_present={bool(token)}') # Если токена нет в query, попробовать из headers if not token: headers = dict(scope.get('headers', [])) auth_header = headers.get(b'authorization', b'').decode() if auth_header.startswith('Bearer '): token = auth_header.split('Bearer ')[1] logger.info(f'[JWTAuthMiddleware] Token found in Authorization header') # Аутентификация пользователя if token: try: # Валидация токена UntypedToken(token) decoded_data = jwt_decode(token, settings.SECRET_KEY, algorithms=["HS256"]) user_id = decoded_data.get('user_id') if user_id: user = await self.get_user(user_id) scope['user'] = user logger.info(f"[JWTAuthMiddleware] JWT auth successful for user {user_id} (path={path})") else: scope['user'] = AnonymousUser() logger.warning(f"JWT token missing user_id (path={path})") except (InvalidToken, TokenError) as e: logger.warning(f"JWT token validation failed (path={path}): {e}") scope['user'] = AnonymousUser() except Exception as e: logger.error(f"JWT auth error (path={path}): {e}", exc_info=True) scope['user'] = AnonymousUser() else: logger.warning(f"No JWT token provided (path={path})") scope['user'] = AnonymousUser() return await super().__call__(scope, receive, send) except Exception as e: logger.error(f"Error in JWTAuthMiddleware: {e}", exc_info=True) scope['user'] = AnonymousUser() return await super().__call__(scope, receive, send) @staticmethod async def get_user(user_id): """Получить пользователя по ID.""" try: return await User.objects.aget(id=user_id) except User.DoesNotExist: return AnonymousUser() def JWTAuthMiddlewareStack(inner): """Обертка для использования JWT аутентификации в WebSocket.""" return JWTAuthMiddleware(AuthMiddlewareStack(inner))