366 lines
15 KiB
Python
366 lines
15 KiB
Python
"""
|
||
WebSocket consumers для real-time уведомлений.
|
||
"""
|
||
import json
|
||
from channels.generic.websocket import AsyncWebsocketConsumer
|
||
from channels.db import database_sync_to_async
|
||
from django.utils import timezone
|
||
from django.contrib.auth import get_user_model
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
User = get_user_model()
|
||
|
||
|
||
class NotificationConsumer(AsyncWebsocketConsumer):
|
||
"""
|
||
WebSocket consumer для real-time уведомлений.
|
||
|
||
URL: ws://domain/ws/notifications/
|
||
Каждый пользователь подключается к своей персональной группе уведомлений.
|
||
"""
|
||
|
||
async def connect(self):
|
||
"""Подключение к WebSocket."""
|
||
try:
|
||
self.user = self.scope['user']
|
||
|
||
logger.info(
|
||
f"WebSocket connection attempt: user={self.user}, "
|
||
f"authenticated={self.user.is_authenticated if self.user else False}"
|
||
)
|
||
|
||
# Проверяем авторизацию
|
||
if not self.user or not self.user.is_authenticated:
|
||
logger.warning("WebSocket connection rejected: user not authenticated")
|
||
await self.close(code=4001) # Unauthorized
|
||
return
|
||
|
||
# Создаем персональную группу для пользователя
|
||
self.user_group_name = f'notifications_user_{self.user.id}'
|
||
|
||
# Проверяем что channel_layer доступен
|
||
if not self.channel_layer:
|
||
logger.error("Channel layer not available")
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
# Присоединяемся к группе пользователя
|
||
try:
|
||
await self.channel_layer.group_add(
|
||
self.user_group_name,
|
||
self.channel_name
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error adding to channel group: {e}", exc_info=True)
|
||
await self.close(code=1011)
|
||
return
|
||
|
||
await self.accept()
|
||
|
||
# Обновляем статус пользователя на онлайн (так как он подключен к WebSocket)
|
||
await self.update_user_presence(True)
|
||
|
||
# Отправляем подтверждение подключения
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'connection_established',
|
||
'message': 'Подключено к уведомлениям',
|
||
'user_id': self.user.id
|
||
}))
|
||
|
||
# Отправляем количество непрочитанных уведомлений
|
||
unread_count = await self.get_unread_count()
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'unread_count',
|
||
'count': unread_count
|
||
}))
|
||
|
||
logger.info(f"User {self.user.id} connected to notifications WebSocket")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in WebSocket connect: {e}", exc_info=True)
|
||
await self.close(code=1011) # Internal Error
|
||
|
||
async def disconnect(self, close_code):
|
||
"""Отключение от WebSocket."""
|
||
logger.info(f"[NotificationConsumer] Disconnect called. User: {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'}, Close code: {close_code}")
|
||
|
||
# Обновляем статус пользователя на оффлайн ПЕРЕД выходом из группы
|
||
if hasattr(self, 'user') and self.user:
|
||
logger.info(f"[NotificationConsumer] Updating user {self.user.id} status to offline")
|
||
await self.update_user_presence(False)
|
||
logger.info(f"[NotificationConsumer] User {self.user.id} status updated to offline")
|
||
|
||
if hasattr(self, 'user_group_name'):
|
||
# Покидаем группу
|
||
try:
|
||
await self.channel_layer.group_discard(
|
||
self.user_group_name,
|
||
self.channel_name
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[NotificationConsumer] Error leaving group: {e}", exc_info=True)
|
||
|
||
logger.info(f"User {self.user.id if hasattr(self, 'user') and self.user else 'Unknown'} disconnected from notifications WebSocket")
|
||
|
||
async def receive(self, text_data):
|
||
"""Получение сообщения от клиента."""
|
||
try:
|
||
data = json.loads(text_data)
|
||
message_type = data.get('type')
|
||
|
||
if message_type == 'mark_as_read':
|
||
await self.handle_mark_as_read(data)
|
||
elif message_type == 'mark_all_as_read':
|
||
await self.handle_mark_all_as_read()
|
||
elif message_type == 'get_unread_count':
|
||
await self.handle_get_unread_count()
|
||
elif message_type == 'ping':
|
||
# Простой ping для проверки соединения
|
||
# Обновляем last_activity при получении ping
|
||
await self.update_user_presence(True)
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'pong',
|
||
'timestamp': timezone.now().isoformat()
|
||
}))
|
||
else:
|
||
logger.warning(f"Unknown message type: {message_type}")
|
||
|
||
except json.JSONDecodeError:
|
||
logger.error("Invalid JSON received")
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'error',
|
||
'message': 'Invalid JSON'
|
||
}))
|
||
except Exception as e:
|
||
logger.error(f"Error in receive: {e}")
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'error',
|
||
'message': str(e)
|
||
}))
|
||
|
||
async def handle_mark_as_read(self, data):
|
||
"""Отметить уведомление как прочитанное."""
|
||
notification_id = data.get('notification_id')
|
||
|
||
if not notification_id:
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'error',
|
||
'message': 'notification_id required'
|
||
}))
|
||
return
|
||
|
||
success = await self.mark_notification_as_read(notification_id)
|
||
|
||
if success:
|
||
# Отправляем подтверждение
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'notification_read',
|
||
'notification_id': notification_id
|
||
}))
|
||
|
||
# Обновляем счетчик непрочитанных
|
||
unread_count = await self.get_unread_count()
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'unread_count',
|
||
'count': unread_count
|
||
}))
|
||
else:
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'error',
|
||
'message': 'Notification not found or access denied'
|
||
}))
|
||
|
||
async def handle_mark_all_as_read(self):
|
||
"""Отметить все уведомления как прочитанные."""
|
||
count = await self.mark_all_notifications_as_read()
|
||
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'all_notifications_read',
|
||
'count': count
|
||
}))
|
||
|
||
# Обновляем счетчик непрочитанных
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'unread_count',
|
||
'count': 0
|
||
}))
|
||
|
||
async def handle_get_unread_count(self):
|
||
"""Получить количество непрочитанных уведомлений."""
|
||
unread_count = await self.get_unread_count()
|
||
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'unread_count',
|
||
'count': unread_count
|
||
}))
|
||
|
||
# Обработчики сообщений от группы (отправляются через channel_layer)
|
||
|
||
async def notification_created(self, event):
|
||
"""
|
||
Отправка нового уведомления пользователю.
|
||
Вызывается когда создается новое уведомление для этого пользователя.
|
||
"""
|
||
notification_data = event.get('notification')
|
||
unread_count_from_event = event.get('unread_count')
|
||
|
||
# Используем счетчик из события, если он есть, иначе получаем из БД
|
||
if unread_count_from_event is not None:
|
||
unread_count = unread_count_from_event
|
||
else:
|
||
unread_count = await self.get_unread_count()
|
||
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'new_notification',
|
||
'notification': notification_data,
|
||
'unread_count': unread_count
|
||
}))
|
||
|
||
# Отправляем обновление счетчика отдельно для совместимости
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'unread_count',
|
||
'count': unread_count
|
||
}))
|
||
|
||
async def notification_updated(self, event):
|
||
"""
|
||
Отправка обновленного уведомления пользователю.
|
||
"""
|
||
notification_data = event.get('notification')
|
||
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'notification_updated',
|
||
'notification': notification_data
|
||
}))
|
||
|
||
async def notification_deleted(self, event):
|
||
"""
|
||
Уведомление об удалении уведомления.
|
||
"""
|
||
notification_id = event.get('notification_id')
|
||
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'notification_deleted',
|
||
'notification_id': notification_id
|
||
}))
|
||
|
||
async def user_status_update(self, event):
|
||
"""
|
||
Отправка обновления статуса пользователя клиенту через WebSocket уведомлений.
|
||
Вызывается когда обновляется статус пользователя (онлайн/оффлайн).
|
||
"""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'user_status_update',
|
||
'user_id': event.get('user_id'),
|
||
'is_online': event.get('is_online'),
|
||
'last_activity': event.get('last_activity')
|
||
}))
|
||
|
||
async def nav_badges_updated(self, event):
|
||
"""
|
||
Уведомление об изменении бейджей нижнего меню (чат, расписание, ДЗ и т.д.).
|
||
Клиент должен перезапросить GET /api/nav-badges/.
|
||
"""
|
||
await self.send(text_data=json.dumps({
|
||
'type': 'nav_badges_updated',
|
||
}))
|
||
|
||
# Вспомогательные методы для работы с БД
|
||
|
||
@database_sync_to_async
|
||
def get_unread_count(self):
|
||
"""Получить количество непрочитанных уведомлений."""
|
||
from .models import Notification
|
||
|
||
return Notification.objects.filter(
|
||
recipient=self.user,
|
||
channel='in_app',
|
||
is_read=False
|
||
).count()
|
||
|
||
@database_sync_to_async
|
||
def mark_notification_as_read(self, notification_id):
|
||
"""Отметить уведомление как прочитанное."""
|
||
from .models import Notification
|
||
|
||
try:
|
||
notification = Notification.objects.get(
|
||
id=notification_id,
|
||
recipient=self.user,
|
||
channel='in_app'
|
||
)
|
||
notification.mark_as_read()
|
||
return True
|
||
except Notification.DoesNotExist:
|
||
return False
|
||
|
||
@database_sync_to_async
|
||
def mark_all_notifications_as_read(self):
|
||
"""Отметить все уведомления как прочитанные."""
|
||
from .models import Notification
|
||
from django.utils import timezone
|
||
|
||
count = Notification.objects.filter(
|
||
recipient=self.user,
|
||
channel='in_app',
|
||
is_read=False
|
||
).update(
|
||
is_read=True,
|
||
read_at=timezone.now()
|
||
)
|
||
|
||
return count
|
||
|
||
@database_sync_to_async
|
||
def update_user_presence(self, is_online):
|
||
"""Обновление статуса присутствия пользователя при подключении/отключении к WebSocket уведомлений."""
|
||
try:
|
||
now = timezone.now()
|
||
# Обновляем last_activity при подключении/отключении
|
||
User.objects.filter(id=self.user.id).update(
|
||
last_activity=now
|
||
)
|
||
|
||
status_update = {
|
||
'type': 'user_status_update',
|
||
'user_id': self.user.id,
|
||
'is_online': is_online,
|
||
'last_activity': now.isoformat() if now else None
|
||
}
|
||
|
||
logger.info(f"[NotificationConsumer] Sending status update: user_id={self.user.id}, is_online={is_online}")
|
||
|
||
# Отправляем обновление статуса всем подписчикам группы user_presence
|
||
if self.channel_layer:
|
||
presence_group_name = 'user_presence'
|
||
|
||
try:
|
||
self.channel_layer.group_send(
|
||
presence_group_name,
|
||
status_update
|
||
)
|
||
logger.info(f"[NotificationConsumer] Status update sent to group '{presence_group_name}'")
|
||
except Exception as e:
|
||
logger.error(f"[NotificationConsumer] Error sending to presence group: {e}", exc_info=True)
|
||
|
||
# Также отправляем обновление в группу уведомлений пользователя,
|
||
# чтобы WebSocket уведомлений мог получать обновления статуса
|
||
if hasattr(self, 'user_group_name'):
|
||
try:
|
||
self.channel_layer.group_send(
|
||
self.user_group_name,
|
||
status_update
|
||
)
|
||
logger.info(f"[NotificationConsumer] Status update sent to user group '{self.user_group_name}'")
|
||
except Exception as e:
|
||
logger.error(f"[NotificationConsumer] Error sending to user group: {e}", exc_info=True)
|
||
|
||
logger.info(f"User {self.user.id} presence updated via notifications WebSocket: is_online={is_online}")
|
||
else:
|
||
logger.warning(f"[NotificationConsumer] Channel layer not available, cannot send status update")
|
||
except Exception as e:
|
||
logger.error(f"Error updating user presence in notifications WebSocket: {e}", exc_info=True)
|
||
|