""" WebSocket consumers для real-time уведомлений. """ import json from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async from django.utils import timezone from django.contrib.auth import get_user_model import logging logger = logging.getLogger(__name__) User = get_user_model() class NotificationConsumer(AsyncWebsocketConsumer): """ WebSocket consumer для real-time уведомлений. URL: ws://domain/ws/notifications/ Каждый пользователь подключается к своей персональной группе уведомлений. """ async def connect(self): """Подключение к WebSocket.""" try: self.user = self.scope['user'] logger.info( f"WebSocket connection attempt: user={self.user}, " f"authenticated={self.user.is_authenticated if self.user else False}" ) # Проверяем авторизацию if not self.user or not self.user.is_authenticated: logger.warning("WebSocket connection rejected: user not authenticated") await self.close(code=4001) # Unauthorized return # Создаем персональную группу для пользователя self.user_group_name = f'notifications_user_{self.user.id}' # Проверяем что channel_layer доступен if not self.channel_layer: logger.error("Channel layer not available") await self.close(code=1011) return # Присоединяемся к группе пользователя try: await self.channel_layer.group_add( self.user_group_name, self.channel_name ) except Exception as e: logger.error(f"Error adding to channel group: {e}", exc_info=True) await self.close(code=1011) return await self.accept() # Обновляем статус пользователя на онлайн (так как он подключен к WebSocket) await self.update_user_presence(True) # Отправляем подтверждение подключения await self.send(text_data=json.dumps({ 'type': 'connection_established', 'message': 'Подключено к уведомлениям', 'user_id': self.user.id })) # Отправляем количество непрочитанных уведомлений unread_count = await self.get_unread_count() await self.send(text_data=json.dumps({ 'type': 'unread_count', 'count': unread_count })) logger.info(f"User {self.user.id} connected to notifications WebSocket") except Exception as e: logger.error(f"Error in WebSocket connect: {e}", exc_info=True) await self.close(code=1011) # Internal Error async def disconnect(self, close_code): """Отключение от WebSocket.""" logger.info(f"[NotificationConsumer] Disconnect called. User: {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'}, Close code: {close_code}") # Обновляем статус пользователя на оффлайн ПЕРЕД выходом из группы if hasattr(self, 'user') and self.user: logger.info(f"[NotificationConsumer] Updating user {self.user.id} status to offline") await self.update_user_presence(False) logger.info(f"[NotificationConsumer] User {self.user.id} status updated to offline") if hasattr(self, 'user_group_name'): # Покидаем группу try: await self.channel_layer.group_discard( self.user_group_name, self.channel_name ) except Exception as e: logger.error(f"[NotificationConsumer] Error leaving group: {e}", exc_info=True) logger.info(f"User {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'} disconnected from notifications WebSocket") async def receive(self, text_data): """Получение сообщения от клиента.""" try: data = json.loads(text_data) message_type = data.get('type') if message_type == 'mark_as_read': await self.handle_mark_as_read(data) elif message_type == 'mark_all_as_read': await self.handle_mark_all_as_read() elif message_type == 'get_unread_count': await self.handle_get_unread_count() elif message_type == 'ping': # Простой ping для проверки соединения # Обновляем last_activity при получении ping await self.update_user_presence(True) await self.send(text_data=json.dumps({ 'type': 'pong', 'timestamp': timezone.now().isoformat() })) else: logger.warning(f"Unknown message type: {message_type}") except json.JSONDecodeError: logger.error("Invalid JSON received") await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Invalid JSON' })) except Exception as e: logger.error(f"Error in receive: {e}") await self.send(text_data=json.dumps({ 'type': 'error', 'message': str(e) })) async def handle_mark_as_read(self, data): """Отметить уведомление как прочитанное.""" notification_id = data.get('notification_id') if not notification_id: await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'notification_id required' })) return success = await self.mark_notification_as_read(notification_id) if success: # Отправляем подтверждение await self.send(text_data=json.dumps({ 'type': 'notification_read', 'notification_id': notification_id })) # Обновляем счетчик непрочитанных unread_count = await self.get_unread_count() await self.send(text_data=json.dumps({ 'type': 'unread_count', 'count': unread_count })) else: await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Notification not found or access denied' })) async def handle_mark_all_as_read(self): """Отметить все уведомления как прочитанные.""" count = await self.mark_all_notifications_as_read() await self.send(text_data=json.dumps({ 'type': 'all_notifications_read', 'count': count })) # Обновляем счетчик непрочитанных await self.send(text_data=json.dumps({ 'type': 'unread_count', 'count': 0 })) async def handle_get_unread_count(self): """Получить количество непрочитанных уведомлений.""" unread_count = await self.get_unread_count() await self.send(text_data=json.dumps({ 'type': 'unread_count', 'count': unread_count })) # Обработчики сообщений от группы (отправляются через channel_layer) async def notification_created(self, event): """ Отправка нового уведомления пользователю. Вызывается когда создается новое уведомление для этого пользователя. """ notification_data = event.get('notification') unread_count_from_event = event.get('unread_count') # Используем счетчик из события, если он есть, иначе получаем из БД if unread_count_from_event is not None: unread_count = unread_count_from_event else: unread_count = await self.get_unread_count() await self.send(text_data=json.dumps({ 'type': 'new_notification', 'notification': notification_data, 'unread_count': unread_count })) # Отправляем обновление счетчика отдельно для совместимости await self.send(text_data=json.dumps({ 'type': 'unread_count', 'count': unread_count })) async def notification_updated(self, event): """ Отправка обновленного уведомления пользователю. """ notification_data = event.get('notification') await self.send(text_data=json.dumps({ 'type': 'notification_updated', 'notification': notification_data })) async def notification_deleted(self, event): """ Уведомление об удалении уведомления. """ notification_id = event.get('notification_id') await self.send(text_data=json.dumps({ 'type': 'notification_deleted', 'notification_id': notification_id })) async def user_status_update(self, event): """ Отправка обновления статуса пользователя клиенту через WebSocket уведомлений. Вызывается когда обновляется статус пользователя (онлайн/оффлайн). """ await self.send(text_data=json.dumps({ 'type': 'user_status_update', 'user_id': event.get('user_id'), 'is_online': event.get('is_online'), 'last_activity': event.get('last_activity') })) async def nav_badges_updated(self, event): """ Уведомление об изменении бейджей нижнего меню (чат, расписание, ДЗ и т.д.). Клиент должен перезапросить GET /api/nav-badges/. """ await self.send(text_data=json.dumps({ 'type': 'nav_badges_updated', })) # Вспомогательные методы для работы с БД @database_sync_to_async def get_unread_count(self): """Получить количество непрочитанных уведомлений.""" from .models import Notification return Notification.objects.filter( recipient=self.user, channel='in_app', is_read=False ).count() @database_sync_to_async def mark_notification_as_read(self, notification_id): """Отметить уведомление как прочитанное.""" from .models import Notification try: notification = Notification.objects.get( id=notification_id, recipient=self.user, channel='in_app' ) notification.mark_as_read() return True except Notification.DoesNotExist: return False @database_sync_to_async def mark_all_notifications_as_read(self): """Отметить все уведомления как прочитанные.""" from .models import Notification from django.utils import timezone count = Notification.objects.filter( recipient=self.user, channel='in_app', is_read=False ).update( is_read=True, read_at=timezone.now() ) return count @database_sync_to_async def update_user_presence(self, is_online): """Обновление статуса присутствия пользователя при подключении/отключении к WebSocket уведомлений.""" try: now = timezone.now() # Обновляем last_activity при подключении/отключении User.objects.filter(id=self.user.id).update( last_activity=now ) status_update = { 'type': 'user_status_update', 'user_id': self.user.id, 'is_online': is_online, 'last_activity': now.isoformat() if now else None } logger.info(f"[NotificationConsumer] Sending status update: user_id={self.user.id}, is_online={is_online}") # Отправляем обновление статуса всем подписчикам группы user_presence if self.channel_layer: presence_group_name = 'user_presence' try: self.channel_layer.group_send( presence_group_name, status_update ) logger.info(f"[NotificationConsumer] Status update sent to group '{presence_group_name}'") except Exception as e: logger.error(f"[NotificationConsumer] Error sending to presence group: {e}", exc_info=True) # Также отправляем обновление в группу уведомлений пользователя, # чтобы WebSocket уведомлений мог получать обновления статуса if hasattr(self, 'user_group_name'): try: self.channel_layer.group_send( self.user_group_name, status_update ) logger.info(f"[NotificationConsumer] Status update sent to user group '{self.user_group_name}'") except Exception as e: logger.error(f"[NotificationConsumer] Error sending to user group: {e}", exc_info=True) logger.info(f"User {self.user.id} presence updated via notifications WebSocket: is_online={is_online}") else: logger.warning(f"[NotificationConsumer] Channel layer not available, cannot send status update") except Exception as e: logger.error(f"Error updating user presence in notifications WebSocket: {e}", exc_info=True)