uchill/backend/apps/notifications/consumers.py

366 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSocket consumers для real-time уведомлений.
"""
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.utils import timezone
from django.contrib.auth import get_user_model
import logging
logger = logging.getLogger(__name__)
User = get_user_model()
class NotificationConsumer(AsyncWebsocketConsumer):
"""
WebSocket consumer для real-time уведомлений.
URL: ws://domain/ws/notifications/
Каждый пользователь подключается к своей персональной группе уведомлений.
"""
async def connect(self):
"""Подключение к WebSocket."""
try:
self.user = self.scope['user']
logger.info(
f"WebSocket connection attempt: user={self.user}, "
f"authenticated={self.user.is_authenticated if self.user else False}"
)
# Проверяем авторизацию
if not self.user or not self.user.is_authenticated:
logger.warning("WebSocket connection rejected: user not authenticated")
await self.close(code=4001) # Unauthorized
return
# Создаем персональную группу для пользователя
self.user_group_name = f'notifications_user_{self.user.id}'
# Проверяем что channel_layer доступен
if not self.channel_layer:
logger.error("Channel layer not available")
await self.close(code=1011)
return
# Присоединяемся к группе пользователя
try:
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
except Exception as e:
logger.error(f"Error adding to channel group: {e}", exc_info=True)
await self.close(code=1011)
return
await self.accept()
# Обновляем статус пользователя на онлайн (так как он подключен к WebSocket)
await self.update_user_presence(True)
# Отправляем подтверждение подключения
await self.send(text_data=json.dumps({
'type': 'connection_established',
'message': 'Подключено к уведомлениям',
'user_id': self.user.id
}))
# Отправляем количество непрочитанных уведомлений
unread_count = await self.get_unread_count()
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': unread_count
}))
logger.info(f"User {self.user.id} connected to notifications WebSocket")
except Exception as e:
logger.error(f"Error in WebSocket connect: {e}", exc_info=True)
await self.close(code=1011) # Internal Error
async def disconnect(self, close_code):
"""Отключение от WebSocket."""
logger.info(f"[NotificationConsumer] Disconnect called. User: {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'}, Close code: {close_code}")
# Обновляем статус пользователя на оффлайн ПЕРЕД выходом из группы
if hasattr(self, 'user') and self.user:
logger.info(f"[NotificationConsumer] Updating user {self.user.id} status to offline")
await self.update_user_presence(False)
logger.info(f"[NotificationConsumer] User {self.user.id} status updated to offline")
if hasattr(self, 'user_group_name'):
# Покидаем группу
try:
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
except Exception as e:
logger.error(f"[NotificationConsumer] Error leaving group: {e}", exc_info=True)
logger.info(f"User {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'} disconnected from notifications WebSocket")
async def receive(self, text_data):
"""Получение сообщения от клиента."""
try:
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'mark_as_read':
await self.handle_mark_as_read(data)
elif message_type == 'mark_all_as_read':
await self.handle_mark_all_as_read()
elif message_type == 'get_unread_count':
await self.handle_get_unread_count()
elif message_type == 'ping':
# Простой ping для проверки соединения
# Обновляем last_activity при получении ping
await self.update_user_presence(True)
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': timezone.now().isoformat()
}))
else:
logger.warning(f"Unknown message type: {message_type}")
except json.JSONDecodeError:
logger.error("Invalid JSON received")
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Invalid JSON'
}))
except Exception as e:
logger.error(f"Error in receive: {e}")
await self.send(text_data=json.dumps({
'type': 'error',
'message': str(e)
}))
async def handle_mark_as_read(self, data):
"""Отметить уведомление как прочитанное."""
notification_id = data.get('notification_id')
if not notification_id:
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'notification_id required'
}))
return
success = await self.mark_notification_as_read(notification_id)
if success:
# Отправляем подтверждение
await self.send(text_data=json.dumps({
'type': 'notification_read',
'notification_id': notification_id
}))
# Обновляем счетчик непрочитанных
unread_count = await self.get_unread_count()
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': unread_count
}))
else:
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Notification not found or access denied'
}))
async def handle_mark_all_as_read(self):
"""Отметить все уведомления как прочитанные."""
count = await self.mark_all_notifications_as_read()
await self.send(text_data=json.dumps({
'type': 'all_notifications_read',
'count': count
}))
# Обновляем счетчик непрочитанных
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': 0
}))
async def handle_get_unread_count(self):
"""Получить количество непрочитанных уведомлений."""
unread_count = await self.get_unread_count()
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': unread_count
}))
# Обработчики сообщений от группы (отправляются через channel_layer)
async def notification_created(self, event):
"""
Отправка нового уведомления пользователю.
Вызывается когда создается новое уведомление для этого пользователя.
"""
notification_data = event.get('notification')
unread_count_from_event = event.get('unread_count')
# Используем счетчик из события, если он есть, иначе получаем из БД
if unread_count_from_event is not None:
unread_count = unread_count_from_event
else:
unread_count = await self.get_unread_count()
await self.send(text_data=json.dumps({
'type': 'new_notification',
'notification': notification_data,
'unread_count': unread_count
}))
# Отправляем обновление счетчика отдельно для совместимости
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': unread_count
}))
async def notification_updated(self, event):
"""
Отправка обновленного уведомления пользователю.
"""
notification_data = event.get('notification')
await self.send(text_data=json.dumps({
'type': 'notification_updated',
'notification': notification_data
}))
async def notification_deleted(self, event):
"""
Уведомление об удалении уведомления.
"""
notification_id = event.get('notification_id')
await self.send(text_data=json.dumps({
'type': 'notification_deleted',
'notification_id': notification_id
}))
async def user_status_update(self, event):
"""
Отправка обновления статуса пользователя клиенту через WebSocket уведомлений.
Вызывается когда обновляется статус пользователя (онлайн/оффлайн).
"""
await self.send(text_data=json.dumps({
'type': 'user_status_update',
'user_id': event.get('user_id'),
'is_online': event.get('is_online'),
'last_activity': event.get('last_activity')
}))
async def nav_badges_updated(self, event):
"""
Уведомление об изменении бейджей нижнего меню (чат, расписание, ДЗ и т.д.).
Клиент должен перезапросить GET /api/nav-badges/.
"""
await self.send(text_data=json.dumps({
'type': 'nav_badges_updated',
}))
# Вспомогательные методы для работы с БД
@database_sync_to_async
def get_unread_count(self):
"""Получить количество непрочитанных уведомлений."""
from .models import Notification
return Notification.objects.filter(
recipient=self.user,
channel='in_app',
is_read=False
).count()
@database_sync_to_async
def mark_notification_as_read(self, notification_id):
"""Отметить уведомление как прочитанное."""
from .models import Notification
try:
notification = Notification.objects.get(
id=notification_id,
recipient=self.user,
channel='in_app'
)
notification.mark_as_read()
return True
except Notification.DoesNotExist:
return False
@database_sync_to_async
def mark_all_notifications_as_read(self):
"""Отметить все уведомления как прочитанные."""
from .models import Notification
from django.utils import timezone
count = Notification.objects.filter(
recipient=self.user,
channel='in_app',
is_read=False
).update(
is_read=True,
read_at=timezone.now()
)
return count
@database_sync_to_async
def update_user_presence(self, is_online):
"""Обновление статуса присутствия пользователя при подключении/отключении к WebSocket уведомлений."""
try:
now = timezone.now()
# Обновляем last_activity при подключении/отключении
User.objects.filter(id=self.user.id).update(
last_activity=now
)
status_update = {
'type': 'user_status_update',
'user_id': self.user.id,
'is_online': is_online,
'last_activity': now.isoformat() if now else None
}
logger.info(f"[NotificationConsumer] Sending status update: user_id={self.user.id}, is_online={is_online}")
# Отправляем обновление статуса всем подписчикам группы user_presence
if self.channel_layer:
presence_group_name = 'user_presence'
try:
self.channel_layer.group_send(
presence_group_name,
status_update
)
logger.info(f"[NotificationConsumer] Status update sent to group '{presence_group_name}'")
except Exception as e:
logger.error(f"[NotificationConsumer] Error sending to presence group: {e}", exc_info=True)
# Также отправляем обновление в группу уведомлений пользователя,
# чтобы WebSocket уведомлений мог получать обновления статуса
if hasattr(self, 'user_group_name'):
try:
self.channel_layer.group_send(
self.user_group_name,
status_update
)
logger.info(f"[NotificationConsumer] Status update sent to user group '{self.user_group_name}'")
except Exception as e:
logger.error(f"[NotificationConsumer] Error sending to user group: {e}", exc_info=True)
logger.info(f"User {self.user.id} presence updated via notifications WebSocket: is_online={is_online}")
else:
logger.warning(f"[NotificationConsumer] Channel layer not available, cannot send status update")
except Exception as e:
logger.error(f"Error updating user presence in notifications WebSocket: {e}", exc_info=True)