uchill/backend/config/views.py

179 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Базовые views для конфигурации.
"""
import time
import mimetypes
from pathlib import Path
from django.http import JsonResponse, FileResponse, Http404
from django.db import connection
from django.core.cache import cache
from django.conf import settings
import redis
import logging
logger = logging.getLogger(__name__)
def serve_media(request, path):
"""
Безопасная раздача медиа-файлов (работает при DEBUG=False при обращении к Django на порту 8123).
Проверяет path на path traversal; отдаёт файл из MEDIA_ROOT.
"""
media_root = Path(settings.MEDIA_ROOT).resolve()
# Запрет path traversal: путь должен оставаться внутри MEDIA_ROOT
file_path = (media_root / path).resolve()
if not str(file_path).startswith(str(media_root)) or not file_path.exists():
raise Http404("Media file not found")
if file_path.is_dir():
raise Http404("Media file not found")
content_type, _ = mimetypes.guess_type(str(file_path))
response = FileResponse(
open(file_path, "rb"),
as_attachment=False,
filename=file_path.name,
content_type=content_type or "application/octet-stream",
)
response["Cache-Control"] = "public, max-age=604800" # 7 дней
return response
def health_check(request):
"""
Health check endpoint для мониторинга.
Проверяет доступность БД, Redis, Celery и WebSocket.
"""
start_time = time.time()
health_status = {
'status': 'healthy',
'timestamp': time.time(),
'checks': {}
}
# Проверка базы данных
try:
db_start = time.time()
with connection.cursor() as cursor:
cursor.execute('SELECT 1')
db_time = (time.time() - db_start) * 1000 # в миллисекундах
health_status['checks']['database'] = {
'status': 'healthy',
'message': 'Database connection successful',
'response_time_ms': round(db_time, 2)
}
except Exception as e:
health_status['status'] = 'unhealthy'
health_status['checks']['database'] = {
'status': 'unhealthy',
'message': f'Database error: {str(e)}',
'response_time_ms': None
}
logger.error(f'Database health check failed: {e}', exc_info=True)
# Проверка Redis
try:
redis_start = time.time()
cache.set('health_check', 'ok', 10)
cache_value = cache.get('health_check')
redis_time = (time.time() - redis_start) * 1000
if cache_value == 'ok':
health_status['checks']['redis'] = {
'status': 'healthy',
'message': 'Redis connection successful',
'response_time_ms': round(redis_time, 2)
}
else:
health_status['status'] = 'unhealthy'
health_status['checks']['redis'] = {
'status': 'unhealthy',
'message': 'Redis cache verification failed',
'response_time_ms': round(redis_time, 2)
}
except Exception as e:
health_status['status'] = 'unhealthy'
health_status['checks']['redis'] = {
'status': 'unhealthy',
'message': f'Redis error: {str(e)}',
'response_time_ms': None
}
logger.error(f'Redis health check failed: {e}', exc_info=True)
# Проверка Celery (через Redis)
try:
celery_start = time.time()
from celery import current_app
inspect = current_app.control.inspect()
active_workers = inspect.active()
celery_time = (time.time() - celery_start) * 1000
if active_workers is not None:
worker_count = len(active_workers)
health_status['checks']['celery'] = {
'status': 'healthy',
'message': f'Celery is running with {worker_count} active worker(s)',
'worker_count': worker_count,
'response_time_ms': round(celery_time, 2)
}
else:
health_status['status'] = 'unhealthy'
health_status['checks']['celery'] = {
'status': 'unhealthy',
'message': 'No active Celery workers found',
'response_time_ms': round(celery_time, 2)
}
except Exception as e:
# Celery может быть недоступен, но это не критично для базового health check
health_status['checks']['celery'] = {
'status': 'warning',
'message': f'Celery check failed: {str(e)}',
'response_time_ms': None
}
logger.warning(f'Celery health check failed: {e}')
# Проверка WebSocket (Channels)
try:
ws_start = time.time()
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
if channel_layer:
# Простая проверка доступности channel layer
ws_time = (time.time() - ws_start) * 1000
health_status['checks']['websocket'] = {
'status': 'healthy',
'message': 'WebSocket channel layer is available',
'response_time_ms': round(ws_time, 2)
}
else:
health_status['checks']['websocket'] = {
'status': 'warning',
'message': 'WebSocket channel layer not configured',
'response_time_ms': None
}
except Exception as e:
health_status['checks']['websocket'] = {
'status': 'warning',
'message': f'WebSocket check failed: {str(e)}',
'response_time_ms': None
}
logger.warning(f'WebSocket health check failed: {e}')
# Общее время выполнения
total_time = (time.time() - start_time) * 1000
health_status['total_response_time_ms'] = round(total_time, 2)
# Добавляем информацию о версии и окружении
health_status['version'] = getattr(settings, 'VERSION', '1.0.0')
health_status['environment'] = 'production' if not settings.DEBUG else 'development'
# Возвращаем 200 если все критичные сервисы здоровы, 503 если есть проблемы
critical_checks = ['database', 'redis']
critical_healthy = all(
health_status['checks'].get(check, {}).get('status') == 'healthy'
for check in critical_checks
)
status_code = 200 if critical_healthy else 503
return JsonResponse(health_status, status=status_code)