323 lines
13 KiB
Python
323 lines
13 KiB
Python
"""
|
||
WebSocket consumers для чата.
|
||
"""
|
||
import json
|
||
from channels.generic.websocket import AsyncWebsocketConsumer
|
||
from channels.db import database_sync_to_async
|
||
from django.utils import timezone
|
||
from django.contrib.auth import get_user_model
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
User = get_user_model()
|
||
|
||
|
||
class ChatConsumer(AsyncWebsocketConsumer):
|
||
"""
|
||
WebSocket consumer для чата.
|
||
|
||
URL: ws://domain/ws/chat/{chat_uuid}/
|
||
"""
|
||
|
||
async def connect(self):
|
||
"""Подключение к WebSocket."""
|
||
try:
|
||
self.chat_uuid = self.scope['url_route']['kwargs']['chat_uuid']
|
||
self.room_group_name = f'chat_{self.chat_uuid}'
|
||
self.user = self.scope['user']
|
||
|
||
logger.info(f"WebSocket connection attempt: user={self.user}, chat_uuid={self.chat_uuid}, authenticated={self.user.is_authenticated if self.user else False}")
|
||
|
||
# Проверяем авторизацию
|
||
if not self.user or not self.user.is_authenticated:
|
||
logger.warning(f"WebSocket connection rejected: user not authenticated")
|
||
await self.close(code=4001) # Unauthorized
|
||
return
|
||
|
||
# Проверяем доступ к чату
|
||
has_access = await self.check_chat_access()
|
||
if not has_access:
|
||
logger.warning(f"WebSocket connection rejected: user {self.user.id} has no access to chat {self.chat_uuid}")
|
||
await self.close(code=4003) # Forbidden
|
||
return
|
||
|
||
# Проверяем что channel_layer доступен
|
||
if not self.channel_layer:
|
||
logger.error("Channel layer not available")
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
# Присоединяемся к группе чата
|
||
try:
|
||
await self.channel_layer.group_add(
|
||
self.room_group_name,
|
||
self.channel_name
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error adding to channel group: {e}", exc_info=True)
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
await self.accept()
|
||
|
||
# Отправляем подтверждение подключения
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'connection_established',
|
||
'message': 'Подключено к чату',
|
||
'chat_uuid': self.chat_uuid
|
||
}))
|
||
|
||
# Уведомляем других участников
|
||
await self.channel_layer.group_send(
|
||
self.room_group_name,
|
||
{
|
||
'type': 'user_joined',
|
||
'user_id': self.user.id,
|
||
'username': self.user.get_full_name()
|
||
}
|
||
)
|
||
|
||
logger.info(f"User {self.user.id} connected to chat {self.chat_uuid}")
|
||
except Exception as e:
|
||
logger.error(f"Error in WebSocket connect: {e}", exc_info=True)
|
||
await self.close(code=1011) # Internal Error
|
||
|
||
async def disconnect(self, close_code):
|
||
"""Отключение от WebSocket."""
|
||
# Уведомляем других участников
|
||
await self.channel_layer.group_send(
|
||
self.room_group_name,
|
||
{
|
||
'type': 'user_left',
|
||
'user_id': self.user.id,
|
||
'username': self.user.get_full_name()
|
||
}
|
||
)
|
||
|
||
# Покидаем группу
|
||
await self.channel_layer.group_discard(
|
||
self.room_group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
logger.info(f"User {self.user.id} disconnected from chat {self.chat_uuid}")
|
||
|
||
async def receive(self, text_data):
|
||
"""Получение сообщения от клиента."""
|
||
try:
|
||
data = json.loads(text_data)
|
||
message_type = data.get('type')
|
||
|
||
if message_type == 'chat_message':
|
||
# Отправляем сообщение в группу
|
||
await self.channel_layer.group_send(
|
||
self.room_group_name,
|
||
{
|
||
'type': 'chat_message',
|
||
'message': data.get('content'),
|
||
'reply_to': data.get('reply_to'),
|
||
'user_id': self.user.id,
|
||
'username': self.user.get_full_name()
|
||
}
|
||
)
|
||
elif message_type == 'typing':
|
||
# Отправляем статус печати в группу
|
||
await self.channel_layer.group_send(
|
||
self.room_group_name,
|
||
{
|
||
'type': 'user_typing',
|
||
'user_id': self.user.id,
|
||
'username': self.user.get_full_name(),
|
||
'is_typing': data.get('is_typing', False)
|
||
}
|
||
)
|
||
elif message_type == 'read_messages':
|
||
# Отправляем информацию о прочитанных сообщениях
|
||
await self.channel_layer.group_send(
|
||
self.room_group_name,
|
||
{
|
||
'type': 'message_read',
|
||
'user_id': self.user.id,
|
||
'message_uuids': data.get('message_uuids', [])
|
||
}
|
||
)
|
||
except json.JSONDecodeError:
|
||
logger.error("Invalid JSON received")
|
||
except Exception as e:
|
||
logger.error(f"Error processing message: {e}", exc_info=True)
|
||
|
||
async def chat_message(self, event):
|
||
"""Отправка сообщения клиенту."""
|
||
# Это сообщение уже обработано в модели Message, просто уведомляем клиента
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'chat_message',
|
||
'message': event.get('message')
|
||
}))
|
||
|
||
async def user_typing(self, event):
|
||
"""Отправка статуса печати клиенту."""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'user_typing',
|
||
'user_id': event.get('user_id'),
|
||
'username': event.get('username'),
|
||
'is_typing': event.get('is_typing', False)
|
||
}))
|
||
|
||
async def message_read(self, event):
|
||
"""Отправка информации о прочитанных сообщениях."""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'message_read',
|
||
'user_id': event.get('user_id'),
|
||
'message_uuids': event.get('message_uuids', [])
|
||
}))
|
||
|
||
async def user_joined(self, event):
|
||
"""Уведомление о присоединении пользователя."""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'user_joined',
|
||
'user_id': event.get('user_id'),
|
||
'username': event.get('username')
|
||
}))
|
||
|
||
async def user_left(self, event):
|
||
"""Уведомление об уходе пользователя."""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'user_left',
|
||
'user_id': event.get('user_id'),
|
||
'username': event.get('username')
|
||
}))
|
||
|
||
@database_sync_to_async
|
||
def check_chat_access(self):
|
||
"""Проверка доступа пользователя к чату."""
|
||
from .models import Chat, ChatParticipant
|
||
|
||
try:
|
||
chat = Chat.objects.get(uuid=self.chat_uuid)
|
||
return ChatParticipant.objects.filter(chat=chat, user=self.user).exists()
|
||
except Chat.DoesNotExist:
|
||
return False
|
||
|
||
|
||
class UserPresenceConsumer(AsyncWebsocketConsumer):
|
||
"""
|
||
WebSocket consumer для отслеживания статусов пользователей (онлайн/оффлайн).
|
||
|
||
URL: ws://domain/ws/presence/
|
||
"""
|
||
|
||
async def connect(self):
|
||
"""Подключение к WebSocket."""
|
||
try:
|
||
self.user = self.scope['user']
|
||
self.presence_group_name = 'user_presence'
|
||
|
||
logger.info(f"Presence WebSocket connection attempt: user={self.user}, authenticated={self.user.is_authenticated if self.user else False}")
|
||
|
||
# Проверяем авторизацию
|
||
if not self.user or not self.user.is_authenticated:
|
||
logger.warning(f"Presence WebSocket connection rejected: user not authenticated")
|
||
await self.close(code=4001) # Unauthorized
|
||
return
|
||
|
||
# Проверяем что channel_layer доступен
|
||
if not self.channel_layer:
|
||
logger.error("Channel layer not available")
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
# Присоединяемся к группе присутствия
|
||
try:
|
||
await self.channel_layer.group_add(
|
||
self.presence_group_name,
|
||
self.channel_name
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error adding to presence channel group: {e}", exc_info=True)
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
await self.accept()
|
||
|
||
# Обновляем статус пользователя на онлайн
|
||
await self.update_user_presence(True)
|
||
|
||
# Отправляем подтверждение подключения
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'connection_established',
|
||
'message': 'Подключено к отслеживанию статусов',
|
||
'user_id': self.user.id
|
||
}))
|
||
|
||
logger.info(f"User {self.user.id} connected to presence WebSocket")
|
||
except Exception as e:
|
||
logger.error(f"Error in Presence WebSocket connect: {e}", exc_info=True)
|
||
await self.close(code=1011) # Internal Error
|
||
|
||
async def disconnect(self, close_code):
|
||
"""Отключение от WebSocket."""
|
||
# Обновляем статус пользователя на оффлайн
|
||
await self.update_user_presence(False)
|
||
|
||
# Покидаем группу
|
||
if self.channel_layer:
|
||
await self.channel_layer.group_discard(
|
||
self.presence_group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
logger.info(f"User {self.user.id} disconnected from presence WebSocket")
|
||
|
||
async def receive(self, text_data):
|
||
"""Получение сообщения от клиента."""
|
||
try:
|
||
data = json.loads(text_data)
|
||
message_type = data.get('type')
|
||
|
||
if message_type == 'ping':
|
||
# Отвечаем на ping для поддержания соединения
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'pong'
|
||
}))
|
||
except json.JSONDecodeError:
|
||
logger.error("Invalid JSON received in presence WebSocket")
|
||
except Exception as e:
|
||
logger.error(f"Error processing presence message: {e}", exc_info=True)
|
||
|
||
async def user_status_update(self, event):
|
||
"""Отправка обновления статуса пользователя клиенту."""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'user_status_update',
|
||
'user_id': event.get('user_id'),
|
||
'is_online': event.get('is_online'),
|
||
'last_activity': event.get('last_activity')
|
||
}))
|
||
|
||
@database_sync_to_async
|
||
def update_user_presence(self, is_online):
|
||
"""Обновление статуса присутствия пользователя."""
|
||
try:
|
||
now = timezone.now()
|
||
# Обновляем last_activity при подключении/отключении
|
||
User.objects.filter(id=self.user.id).update(
|
||
last_activity=now
|
||
)
|
||
|
||
# Отправляем обновление статуса всем подписчикам
|
||
if self.channel_layer:
|
||
from django.utils.dateparse import parse_datetime
|
||
from django.utils import timezone as tz
|
||
|
||
self.channel_layer.group_send(
|
||
self.presence_group_name,
|
||
{
|
||
'type': 'user_status_update',
|
||
'user_id': self.user.id,
|
||
'is_online': is_online,
|
||
'last_activity': now.isoformat() if now else None
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error updating user presence: {e}", exc_info=True)
|