uchill/backend/apps/video/services/janus_client.py

319 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Клиент для работы с Janus Gateway API.
Параллельная реализация с ion-sfu для сравнения.
"""
import requests
import logging
from typing import Dict, Any, Optional
from django.conf import settings
logger = logging.getLogger(__name__)
class JanusClient:
"""Клиент для взаимодействия с Janus Gateway."""
def __init__(self, base_url: Optional[str] = None):
"""
Инициализация клиента Janus.
Args:
base_url: Базовый URL Janus API (например, http://localhost:8088/janus)
"""
self.base_url = base_url or getattr(settings, 'JANUS_HTTP_URL', 'http://localhost:8088/janus')
self.session_id = None
self.plugin_handle_id = None
def create_session(self) -> Dict[str, Any]:
"""
Создать новую сессию Janus.
Returns:
Словарь с информацией о сессии, включая session_id
"""
try:
response = requests.post(
self.base_url,
json={"janus": "create", "transaction": self._generate_transaction_id()},
timeout=5
)
response.raise_for_status()
data = response.json()
if data.get('janus') == 'success':
self.session_id = data.get('data', {}).get('id')
logger.info(f"Janus session created: {self.session_id}")
return data
else:
raise Exception(f"Failed to create Janus session: {data}")
except Exception as e:
logger.error(f"Error creating Janus session: {e}")
raise
def attach_plugin(self, plugin: str = "janus.plugin.videoroom") -> Dict[str, Any]:
"""
Подключить плагин к сессии.
Args:
plugin: Имя плагина (по умолчанию videoroom)
Returns:
Словарь с информацией о плагине, включая handle_id
"""
if not self.session_id:
raise Exception("No active session. Call create_session() first.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}",
json={
"janus": "attach",
"plugin": plugin,
"transaction": self._generate_transaction_id()
},
timeout=5
)
response.raise_for_status()
data = response.json()
if data.get('janus') == 'success':
self.plugin_handle_id = data.get('data', {}).get('id')
logger.info(f"Plugin attached: {plugin}, handle: {self.plugin_handle_id}")
return data
else:
raise Exception(f"Failed to attach plugin: {data}")
except Exception as e:
logger.error(f"Error attaching plugin: {e}")
raise
def create_room(self, room_id: int, description: str = "", publishers: int = 6, secret: str = "adminpwd") -> Dict[str, Any]:
"""
Создать новую видеокомнату.
Args:
room_id: ID комнаты
description: Описание комнаты
publishers: Максимальное количество publishers
secret: Секретный ключ для управления комнатой
Returns:
Информация о созданной комнате
"""
if not self.plugin_handle_id:
raise Exception("No plugin handle. Call attach_plugin() first.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}/{self.plugin_handle_id}",
json={
"janus": "message",
"transaction": self._generate_transaction_id(),
"body": {
"request": "create",
"room": room_id,
"description": description,
"publishers": publishers,
"bitrate": 512000,
"fir_freq": 10,
"audiocodec": "opus",
"videocodec": "vp8",
"record": False,
"rec_dir": "/recordings",
"secret": secret, # Секретный ключ для управления
"pin": "" # Без PIN для входа
}
},
timeout=5
)
response.raise_for_status()
data = response.json()
logger.info(f"Room created: {room_id}")
return data
except Exception as e:
logger.error(f"Error creating room: {e}")
raise
def destroy_room(self, room_id: int, secret: str = "adminpwd") -> Dict[str, Any]:
"""
Удалить видеокомнату.
Args:
room_id: ID комнаты для удаления
secret: Секретный ключ (должен совпадать с тем, что использовался при создании)
Returns:
Результат операции
"""
if not self.plugin_handle_id:
raise Exception("No plugin handle. Call attach_plugin() first.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}/{self.plugin_handle_id}",
json={
"janus": "message",
"transaction": self._generate_transaction_id(),
"body": {
"request": "destroy",
"room": room_id,
"secret": secret # Обязательно для удаления
}
},
timeout=5
)
response.raise_for_status()
data = response.json()
logger.info(f"Room destroyed: {room_id}")
return data
except Exception as e:
logger.error(f"Error destroying room: {e}")
raise
def list_rooms(self) -> Dict[str, Any]:
"""
Получить список всех комнат.
Returns:
Список комнат
"""
if not self.plugin_handle_id:
raise Exception("No plugin handle. Call attach_plugin() first.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}/{self.plugin_handle_id}",
json={
"janus": "message",
"transaction": self._generate_transaction_id(),
"body": {
"request": "list"
}
},
timeout=5
)
response.raise_for_status()
data = response.json()
return data
except Exception as e:
logger.error(f"Error listing rooms: {e}")
raise
def get_room_info(self, room_id: int) -> Dict[str, Any]:
"""
Получить информацию о комнате.
Args:
room_id: ID комнаты
Returns:
Информация о комнате
"""
if not self.plugin_handle_id:
raise Exception("No plugin handle. Call attach_plugin() first.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}/{self.plugin_handle_id}",
json={
"janus": "message",
"transaction": self._generate_transaction_id(),
"body": {
"request": "exists",
"room": room_id
}
},
timeout=5
)
response.raise_for_status()
data = response.json()
return data
except Exception as e:
logger.error(f"Error getting room info: {e}")
raise
def keepalive(self) -> Dict[str, Any]:
"""
Отправить keepalive для поддержания сессии.
Returns:
Результат операции
"""
if not self.session_id:
raise Exception("No active session.")
try:
response = requests.post(
f"{self.base_url}/{self.session_id}",
json={
"janus": "keepalive",
"transaction": self._generate_transaction_id()
},
timeout=5
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error sending keepalive: {e}")
raise
def destroy_session(self) -> None:
"""Уничтожить сессию Janus."""
if not self.session_id:
return
try:
requests.post(
f"{self.base_url}/{self.session_id}",
json={
"janus": "destroy",
"transaction": self._generate_transaction_id()
},
timeout=5
)
logger.info(f"Janus session destroyed: {self.session_id}")
self.session_id = None
self.plugin_handle_id = None
except Exception as e:
logger.error(f"Error destroying session: {e}")
@staticmethod
def _generate_transaction_id() -> str:
"""Генерировать уникальный ID транзакции."""
import random
import string
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def __enter__(self):
"""Context manager entry."""
self.create_session()
self.attach_plugin()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.destroy_session()
# Вспомогательная функция для быстрого получения клиента
def get_janus_client() -> JanusClient:
"""
Получить настроенный экземпляр JanusClient.
Returns:
JanusClient: Клиент для работы с Janus Gateway
"""
return JanusClient()