89 lines
4.0 KiB
Python
89 lines
4.0 KiB
Python
"""
|
|
JWT аутентификация для WebSocket соединений.
|
|
"""
|
|
import logging
|
|
from urllib.parse import parse_qs
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from channels.middleware import BaseMiddleware
|
|
from channels.auth import AuthMiddlewareStack
|
|
from rest_framework_simplejwt.tokens import UntypedToken
|
|
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
|
|
from jwt import decode as jwt_decode
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
|
|
class JWTAuthMiddleware(BaseMiddleware):
|
|
"""
|
|
Middleware для аутентификации WebSocket через JWT токены.
|
|
Извлекает токен из query параметров или заголовков.
|
|
"""
|
|
|
|
async def __call__(self, scope, receive, send):
|
|
try:
|
|
# Извлечь токен из query параметров
|
|
query_string = scope.get('query_string', b'').decode()
|
|
query_params = parse_qs(query_string)
|
|
token = query_params.get('token', [None])[0]
|
|
|
|
# Логируем для отладки
|
|
path = scope.get('path', '')
|
|
logger.info(f'[JWTAuthMiddleware] WebSocket connection attempt: path={path}, query_string={query_string[:100] if len(query_string) > 100 else query_string}, token_present={bool(token)}')
|
|
|
|
# Если токена нет в query, попробовать из headers
|
|
if not token:
|
|
headers = dict(scope.get('headers', []))
|
|
auth_header = headers.get(b'authorization', b'').decode()
|
|
if auth_header.startswith('Bearer '):
|
|
token = auth_header.split('Bearer ')[1]
|
|
logger.info(f'[JWTAuthMiddleware] Token found in Authorization header')
|
|
|
|
# Аутентификация пользователя
|
|
if token:
|
|
try:
|
|
# Валидация токена
|
|
UntypedToken(token)
|
|
decoded_data = jwt_decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
user_id = decoded_data.get('user_id')
|
|
|
|
if user_id:
|
|
user = await self.get_user(user_id)
|
|
scope['user'] = user
|
|
logger.info(f"[JWTAuthMiddleware] JWT auth successful for user {user_id} (path={path})")
|
|
else:
|
|
scope['user'] = AnonymousUser()
|
|
logger.warning(f"JWT token missing user_id (path={path})")
|
|
|
|
except (InvalidToken, TokenError) as e:
|
|
logger.warning(f"JWT token validation failed (path={path}): {e}")
|
|
scope['user'] = AnonymousUser()
|
|
except Exception as e:
|
|
logger.error(f"JWT auth error (path={path}): {e}", exc_info=True)
|
|
scope['user'] = AnonymousUser()
|
|
else:
|
|
logger.warning(f"No JWT token provided (path={path})")
|
|
scope['user'] = AnonymousUser()
|
|
|
|
return await super().__call__(scope, receive, send)
|
|
except Exception as e:
|
|
logger.error(f"Error in JWTAuthMiddleware: {e}", exc_info=True)
|
|
scope['user'] = AnonymousUser()
|
|
return await super().__call__(scope, receive, send)
|
|
|
|
@staticmethod
|
|
async def get_user(user_id):
|
|
"""Получить пользователя по ID."""
|
|
try:
|
|
return await User.objects.aget(id=user_id)
|
|
except User.DoesNotExist:
|
|
return AnonymousUser()
|
|
|
|
|
|
def JWTAuthMiddlewareStack(inner):
|
|
"""Обертка для использования JWT аутентификации в WebSocket."""
|
|
return JWTAuthMiddleware(AuthMiddlewareStack(inner))
|
|
|