""" Базовые views для конфигурации. """ import time import mimetypes from pathlib import Path from django.http import JsonResponse, FileResponse, Http404 from django.db import connection from django.core.cache import cache from django.conf import settings import redis import logging logger = logging.getLogger(__name__) def serve_media(request, path): """ Безопасная раздача медиа-файлов (работает при DEBUG=False при обращении к Django на порту 8123). Проверяет path на path traversal; отдаёт файл из MEDIA_ROOT. """ media_root = Path(settings.MEDIA_ROOT).resolve() # Запрет path traversal: путь должен оставаться внутри MEDIA_ROOT file_path = (media_root / path).resolve() if not str(file_path).startswith(str(media_root)) or not file_path.exists(): raise Http404("Media file not found") if file_path.is_dir(): raise Http404("Media file not found") content_type, _ = mimetypes.guess_type(str(file_path)) response = FileResponse( open(file_path, "rb"), as_attachment=False, filename=file_path.name, content_type=content_type or "application/octet-stream", ) response["Cache-Control"] = "public, max-age=604800" # 7 дней return response def health_check(request): """ Health check endpoint для мониторинга. Проверяет доступность БД, Redis, Celery и WebSocket. """ start_time = time.time() health_status = { 'status': 'healthy', 'timestamp': time.time(), 'checks': {} } # Проверка базы данных try: db_start = time.time() with connection.cursor() as cursor: cursor.execute('SELECT 1') db_time = (time.time() - db_start) * 1000 # в миллисекундах health_status['checks']['database'] = { 'status': 'healthy', 'message': 'Database connection successful', 'response_time_ms': round(db_time, 2) } except Exception as e: health_status['status'] = 'unhealthy' health_status['checks']['database'] = { 'status': 'unhealthy', 'message': f'Database error: {str(e)}', 'response_time_ms': None } logger.error(f'Database health check failed: {e}', exc_info=True) # Проверка Redis try: redis_start = time.time() cache.set('health_check', 'ok', 10) cache_value = cache.get('health_check') redis_time = (time.time() - redis_start) * 1000 if cache_value == 'ok': health_status['checks']['redis'] = { 'status': 'healthy', 'message': 'Redis connection successful', 'response_time_ms': round(redis_time, 2) } else: health_status['status'] = 'unhealthy' health_status['checks']['redis'] = { 'status': 'unhealthy', 'message': 'Redis cache verification failed', 'response_time_ms': round(redis_time, 2) } except Exception as e: health_status['status'] = 'unhealthy' health_status['checks']['redis'] = { 'status': 'unhealthy', 'message': f'Redis error: {str(e)}', 'response_time_ms': None } logger.error(f'Redis health check failed: {e}', exc_info=True) # Проверка Celery (через Redis) try: celery_start = time.time() from celery import current_app inspect = current_app.control.inspect() active_workers = inspect.active() celery_time = (time.time() - celery_start) * 1000 if active_workers is not None: worker_count = len(active_workers) health_status['checks']['celery'] = { 'status': 'healthy', 'message': f'Celery is running with {worker_count} active worker(s)', 'worker_count': worker_count, 'response_time_ms': round(celery_time, 2) } else: health_status['status'] = 'unhealthy' health_status['checks']['celery'] = { 'status': 'unhealthy', 'message': 'No active Celery workers found', 'response_time_ms': round(celery_time, 2) } except Exception as e: # Celery может быть недоступен, но это не критично для базового health check health_status['checks']['celery'] = { 'status': 'warning', 'message': f'Celery check failed: {str(e)}', 'response_time_ms': None } logger.warning(f'Celery health check failed: {e}') # Проверка WebSocket (Channels) try: ws_start = time.time() from channels.layers import get_channel_layer channel_layer = get_channel_layer() if channel_layer: # Простая проверка доступности channel layer ws_time = (time.time() - ws_start) * 1000 health_status['checks']['websocket'] = { 'status': 'healthy', 'message': 'WebSocket channel layer is available', 'response_time_ms': round(ws_time, 2) } else: health_status['checks']['websocket'] = { 'status': 'warning', 'message': 'WebSocket channel layer not configured', 'response_time_ms': None } except Exception as e: health_status['checks']['websocket'] = { 'status': 'warning', 'message': f'WebSocket check failed: {str(e)}', 'response_time_ms': None } logger.warning(f'WebSocket health check failed: {e}') # Общее время выполнения total_time = (time.time() - start_time) * 1000 health_status['total_response_time_ms'] = round(total_time, 2) # Добавляем информацию о версии и окружении health_status['version'] = getattr(settings, 'VERSION', '1.0.0') health_status['environment'] = 'production' if not settings.DEBUG else 'development' # Возвращаем 200 если все критичные сервисы здоровы, 503 если есть проблемы critical_checks = ['database', 'redis'] critical_healthy = all( health_status['checks'].get(check, {}).get('status') == 'healthy' for check in critical_checks ) status_code = 200 if critical_healthy else 503 return JsonResponse(health_status, status=status_code)