""" Сервис для AI проверки домашних заданий. Поддерживает ИИ-агенты из БД (RouterAI и др. OpenAI-совместимые) и fallback на OPENAI_API_KEY из настроек. RouterAI: https://routerai.ru/docs/reference OpenAI-совместимый: POST https://routerai.ru/api/v1/chat/completions Модели: openai/gpt-4o-mini, anthropic/claude-3-5-sonnet и др. — см. https://routerai.ru/models Авторизация: Bearer Поддерживает multimodal (текст + base64 изображения). """ import base64 import io import logging import os import subprocess import tempfile import requests from django.conf import settings from django.db.models import F from typing import Dict, List, Optional, Tuple, Any logger = logging.getLogger(__name__) # Максимальный размер одного изображения для отправки в AI (байты) MAX_IMAGE_SIZE = 8 * 1024 * 1024 # 8 МБ # Расширения изображений, которые отправляем в запрос IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'} # Расширения текстовых файлов: читаем содержимое и подставляем в промпт (задание/решение) TEXT_EXTENSIONS = {'.txt', '.py', '.md', '.json', '.csv', '.xml', '.html', '.css', '.js', '.ts', '.jsx', '.tsx', '.c', '.cpp', '.h', '.java', '.kt', '.rs', '.go', '.rb', '.php', '.sql', '.yaml', '.yml', '.ini', '.cfg', '.log'} # Максимальный размер текстового файла для чтения (байты) MAX_TEXT_FILE_SIZE = 25 * 1024 * 1024 # 2 МБ # Максимум символов из одного файла в промпт (чтобы не перегружать токены) MAX_CHARS_PER_FILE = 800_000 def _read_file_text(file_path: str) -> Optional[str]: """ Читает текстовый файл (.txt, .py, .md и т.д.) и возвращает содержимое. Кодировки: utf-8, затем cp1251, latin-1. Ограничение размера и длины — MAX_TEXT_FILE_SIZE, MAX_CHARS_PER_FILE. """ if not file_path or not os.path.isfile(file_path): return None ext = os.path.splitext(file_path)[1].lower() if ext not in TEXT_EXTENSIONS: return None try: size = os.path.getsize(file_path) if size > MAX_TEXT_FILE_SIZE: logger.warning("AI check: пропуск текстового файла (слишком большой %s bytes): %s", size, file_path) return None for encoding in ('utf-8', 'cp1251', 'latin-1'): try: with open(file_path, 'r', encoding=encoding) as f: text = f.read(MAX_CHARS_PER_FILE + 1) if len(text) > MAX_CHARS_PER_FILE: text = text[:MAX_CHARS_PER_FILE] + "\n\n[... файл обрезан ...]" return text except UnicodeDecodeError: continue return None except Exception as e: logger.warning("AI check: не удалось прочитать текстовый файл %s: %s", file_path, e) return None def _extract_pdf_text_pymupdf(data: bytes) -> Optional[str]: """Fallback: извлечение текста из PDF через PyMuPDF (часто справляется там, где pypdf возвращает пусто).""" if not data or len(data) > MAX_TEXT_FILE_SIZE: return None try: import pymupdf doc = pymupdf.open(stream=data, filetype="pdf") parts = [] total = 0 for page in doc: if total >= MAX_CHARS_PER_FILE: parts.append("\n[... PDF обрезан ...]") break text = page.get_text() or "" if total + len(text) > MAX_CHARS_PER_FILE: text = text[: MAX_CHARS_PER_FILE - total] parts.append(text) total += len(text) doc.close() return "\n".join(parts).strip() or None except ImportError: logger.debug("AI check: pymupdf не установлен, fallback для PDF недоступен") return None except Exception as e: logger.warning("AI check: PyMuPDF не смог извлечь текст из PDF: %s", e) return None def _extract_pdf_text_pdfminer(data: bytes) -> Optional[str]: """Извлечение текста из PDF через pdfminer.six (часто справляется с кодировками и структурой).""" if not data or len(data) > MAX_TEXT_FILE_SIZE: return None try: from pdfminer.high_level import extract_text from pdfminer.layout import LAParams text = extract_text(io.BytesIO(data), laparams=LAParams(line_margin=0.5, word_margin=0.1)) if not text or not text.strip(): return None if len(text) > MAX_CHARS_PER_FILE: text = text[:MAX_CHARS_PER_FILE] + "\n\n[... PDF обрезан ...]" return text.strip() except ImportError: logger.debug("AI check: pdfminer.six не установлен") return None except Exception as e: logger.warning("AI check: pdfminer исключение: %s", e) return None def _write_text_to_txt_and_read(text: str) -> Optional[str]: """Пишем текст во временный .txt и читаем обратно — результат «из файла».""" if not text or not text.strip(): return None try: fd, path = tempfile.mkstemp(suffix=".txt", prefix="pdf2txt_", text=True) try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(text[:MAX_CHARS_PER_FILE]) if len(text) > MAX_CHARS_PER_FILE: f.write("\n\n[... обрезано ...]") with open(path, "r", encoding="utf-8") as f: return f.read().strip() or None finally: try: os.unlink(path) except OSError: pass except Exception as e: logger.warning("AI check: не удалось записать/прочитать .txt: %s", e) return text.strip() def _convert_pdf_to_txt_and_read(pdf_path: Optional[str] = None, pdf_bytes: Optional[bytes] = None) -> Optional[str]: """ Получаем PDF → сразу конвертируем в .txt файл → читаем текст из .txt. Так текст всегда получаем из файла. Порядок: pdftotext (если есть) → pypdf → PyMuPDF → pdfminer. """ if pdf_path and not os.path.isfile(pdf_path): pdf_path = None if not pdf_path and not pdf_bytes: return None if pdf_bytes: if len(pdf_bytes) > MAX_TEXT_FILE_SIZE: return None if len(pdf_bytes) >= 5 and pdf_bytes[:5] != b"%PDF-": logger.warning("AI check: байты не похожи на PDF, len=%s", len(pdf_bytes)) return None # Если только байты — пишем во временный PDF, дальше работаем с путём temp_pdf_path = None if not pdf_path and pdf_bytes: try: fd, temp_pdf_path = tempfile.mkstemp(suffix=".pdf") os.close(fd) with open(temp_pdf_path, "wb") as f: f.write(pdf_bytes) pdf_path = temp_pdf_path except Exception as e: logger.warning("AI check: не удалось записать PDF во временный файл: %s", e) return None txt_path = None try: # 1) pdftotext (poppler) — часто надёжнее библиотек try: fd, txt_path = tempfile.mkstemp(suffix=".txt", prefix="pdf2txt_", text=True) os.close(fd) r = subprocess.run( ["pdftotext", "-enc", "UTF-8", pdf_path, txt_path], capture_output=True, timeout=60, cwd=os.path.dirname(pdf_path) or None, ) if r.returncode == 0 and os.path.isfile(txt_path): with open(txt_path, "r", encoding="utf-8", errors="replace") as f: text = f.read(MAX_CHARS_PER_FILE + 1) if len(text) > MAX_CHARS_PER_FILE: text = text[:MAX_CHARS_PER_FILE] + "\n\n[... обрезано ...]" if text.strip(): logger.info("AI check: pdftotext сконвертировал PDF в .txt, прочитано %s символов", len(text)) return text.strip() logger.info("AI check: pdftotext вернул код 0, но .txt пустой (возможно скан без текстового слоя)") else: stderr = (r.stderr or b"").decode("utf-8", errors="replace").strip() if r.stderr else "" logger.warning("AI check: pdftotext не сработал returncode=%s stderr=%s", r.returncode, stderr[:500]) except FileNotFoundError: logger.warning("AI check: pdftotext не найден в PATH (в Docker установите poppler-utils)") except Exception as e: logger.warning("AI check: pdftotext исключение: %s", e) finally: if txt_path and os.path.isfile(txt_path): try: os.unlink(txt_path) except OSError: pass txt_path = None # 2) Извлечение библиотеками и запись в .txt — пробуем все по очереди with open(pdf_path, "rb") as f: data = f.read() logger.info("AI check: PDF прочитан с диска, %s bytes, пробуем извлечь текст", len(data)) raw_text = None # pdfminer часто лучше для PDF с кириллицей и сложной вёрсткой raw_text = _extract_pdf_text_pdfminer(data) if raw_text and raw_text.strip(): logger.info("AI check: pdfminer извлёк %s символов", len(raw_text)) else: logger.info("AI check: pdfminer вернул пустой текст") if not (raw_text and raw_text.strip()): raw_text = _extract_pdf_text_pymupdf(data) if raw_text and raw_text.strip(): logger.info("AI check: PyMuPDF извлёк %s символов", len(raw_text)) else: logger.info("AI check: PyMuPDF вернул пустой текст") if not (raw_text and raw_text.strip()): try: from pypdf import PdfReader reader = PdfReader(io.BytesIO(data)) parts = [] total = 0 for page in reader.pages: if total >= MAX_CHARS_PER_FILE: break t = page.extract_text() or "" if total + len(t) > MAX_CHARS_PER_FILE: t = t[: MAX_CHARS_PER_FILE - total] parts.append(t) total += len(t) raw_text = "\n".join(parts).strip() or None if raw_text: logger.info("AI check: pypdf извлёк %s символов", len(raw_text)) else: logger.info("AI check: pypdf вернул пустой текст (страниц: %s)", len(reader.pages)) except Exception as e: logger.warning("AI check: pypdf исключение: %s", e) if not (raw_text and raw_text.strip()): logger.warning( "AI check: ни один метод не извлек текст. Возможные причины: PDF — скан (нет текстового слоя), защищён, или нестандартная кодировка. Путь: %s", pdf_path, ) return None # Конверт: пишем в .txt и читаем из файла return _write_text_to_txt_and_read(raw_text) finally: if temp_pdf_path and os.path.isfile(temp_pdf_path): try: os.unlink(temp_pdf_path) except OSError: pass def _resolve_media_path(file_path: str) -> Optional[str]: """Проверяем путь к файлу; если не найден и путь относительный — пробуем MEDIA_ROOT (для .py, .txt, PDF и т.д.).""" if not file_path: return None if os.path.isfile(file_path): return file_path try: media = getattr(settings, "MEDIA_ROOT", None) or "" if not media or os.path.isabs(file_path): return None base = file_path.lstrip("/") for candidate in ( os.path.normpath(os.path.join(media, base)), os.path.normpath(os.path.join(media, os.path.basename(file_path))), ): if os.path.isfile(candidate): return candidate except Exception: pass return None def _resolve_pdf_path(file_path: str) -> Optional[str]: """Проверяем путь к PDF; если относительный и файл не найден — пробуем MEDIA_ROOT.""" return _resolve_media_path(file_path) def _read_pdf_text(file_path: str) -> Optional[str]: """PDF по пути → конверт в .txt → читаем текст из .txt. Всегда пробуем извлечь текст.""" resolved = _resolve_pdf_path(file_path) if not resolved: logger.warning("AI check: PDF файл не найден: %s", file_path) return None file_path = resolved if os.path.splitext(file_path)[1].lower() != ".pdf": return None try: size = os.path.getsize(file_path) logger.info("AI check: читаем PDF path=%s size=%s", file_path, size) if size > MAX_TEXT_FILE_SIZE: logger.warning("AI check: пропуск PDF (слишком большой): %s", file_path) return None return _convert_pdf_to_txt_and_read(pdf_path=file_path) except Exception as e: logger.warning("AI check: не удалось прочитать PDF %s: %s", file_path, e) return None # Magic bytes для определения PDF по содержимому (если имя файла без расширения) PDF_MAGIC = b"%PDF-" def _extract_pdf_text_from_bytes(data: bytes) -> Optional[str]: """PDF из байтов → конверт в .txt → читаем текст из .txt.""" if not data or len(data) > MAX_TEXT_FILE_SIZE: return None if len(data) >= 5 and data[:5] != PDF_MAGIC: logger.warning("AI check: байты не похожи на PDF, len=%s", len(data)) return None logger.info("AI check: конвертируем PDF в .txt, размер %s bytes", len(data)) return _convert_pdf_to_txt_and_read(pdf_bytes=data) # Подпись для пустых или нечитаемых файлов, чтобы AI видел, что файл приложен, и мог попросить добавить код _EMPTY_OR_UNREADABLE_PLACEHOLDER = "(файл пустой или не удалось прочитать содержимое — попроси студента добавить код/текст в файл)" def _read_file_content_for_ai(file_path: str) -> Optional[Tuple[str, str]]: """ Возвращает (имя_файла, содержимое) для вставки в промпт. Если файл пустой или не удалось прочитать — возвращает (имя, подпись), чтобы AI видел приложенный файл. Изображения не возвращаются (идут в multimodal). """ # Разрешаем путь через MEDIA_ROOT при необходимости (PDF, .py, .txt и т.д.) if file_path: file_path = _resolve_media_path(file_path) or file_path if not file_path or not os.path.isfile(file_path): return None ext = os.path.splitext(file_path)[1].lower() name = os.path.basename(file_path) if ext in IMAGE_EXTENSIONS: return None # изображения обрабатываются отдельно if ext == '.pdf': content = _read_pdf_text(file_path) elif ext in TEXT_EXTENSIONS: content = _read_file_text(file_path) else: content = _read_file_text(file_path) # попробовать как текст if content and content.strip(): return (name, content.strip()) # Файл пустой или не удалось прочитать — всё равно добавляем в промпт с подписью return (name, _EMPTY_OR_UNREADABLE_PLACEHOLDER) def _read_image_as_data_url(file_path: str) -> Optional[str]: """ Читает файл с диска и возвращает data URL (data:image/...;base64,...) для изображений. Возвращает None, если файл не изображение, слишком большой или ошибка чтения. """ if not file_path or not os.path.isfile(file_path): return None ext = os.path.splitext(file_path)[1].lower() if ext not in IMAGE_EXTENSIONS: return None try: size = os.path.getsize(file_path) if size > MAX_IMAGE_SIZE: logger.warning("AI check: пропуск изображения (слишком большой %s bytes): %s", size, file_path) return None with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode('ascii') mime = 'image/jpeg' if ext in ('.jpg', '.jpeg') else f'image/{ext[1:]}' # png, gif, webp, bmp return f"data:{mime};base64,{b64}" except Exception as e: logger.warning("AI check: не удалось прочитать изображение %s: %s", file_path, e) return None def _get_default_agent(): """Получить агент по умолчанию для проверки ДЗ (из БД).""" try: from .models import HomeworkAIAgent return HomeworkAIAgent.objects.filter(is_default=True, is_active=True).first() except Exception: return None def _get_client_and_model(): """ Возвращает (client, model_name, agent) для проверки ДЗ. agent может быть None при fallback на OPENAI_*. Лимиты и параметры генерации задаются у провайдера (RouterAI и др.); в запросе передаём только промпт (messages). Приоритет: агент из БД (is_default=True) → настройки OPENAI_*. """ agent = _get_default_agent() if agent: api_key = (agent.api_key or '').strip() or getattr(settings, 'HOMEWORK_AI_API_KEY', None) or getattr(settings, 'OPENAI_API_KEY', None) if not api_key: logger.warning("ИИ-агент '%s' выбран, но API ключ не задан (агент.api_key или HOMEWORK_AI_API_KEY).", agent.name) return None, None, None try: from openai import OpenAI base_url = agent.get_base_url() if not base_url: logger.warning("ИИ-агент '%s': openai_url пустой.", agent.name) return None, None, None auth_header = getattr(agent, 'auth_header', None) or 'Bearer' client_kwargs = {'base_url': base_url, 'api_key': api_key} if auth_header == 'X-API-Key': client_kwargs['default_headers'] = {'X-API-Key': api_key} client = OpenAI(**client_kwargs) return client, agent.model_name, agent except ImportError: logger.error("OpenAI библиотека не установлена. Установите: pip install openai") return None, None, None # Fallback: настройки OpenAI api_key = getattr(settings, 'OPENAI_API_KEY', None) model = getattr(settings, 'OPENAI_MODEL', 'gpt-4o-mini') if not api_key: logger.warning("OPENAI_API_KEY не установлен. AI проверка недоступна.") return None, None, None try: from openai import OpenAI client = OpenAI(api_key=api_key) return client, model, None except ImportError: logger.error("OpenAI библиотека не установлена. Установите: pip install openai") return None, None, None def _normalize_usage(data: Any) -> Optional[Dict[str, int]]: """ Извлечь usage (prompt_tokens, completion_tokens, total_tokens) из ответа API. Поддерживает OpenAI-формат (usage.prompt_tokens и т.д.) и варианты (usage.total_tokens, tokens). """ if not data: return None usage = None if isinstance(data, dict): usage = data.get('usage') elif hasattr(data, 'usage'): usage = getattr(data, 'usage', None) if usage is None: return None if hasattr(usage, 'prompt_tokens'): pt = getattr(usage, 'prompt_tokens', None) ct = getattr(usage, 'completion_tokens', None) tt = getattr(usage, 'total_tokens', None) elif isinstance(usage, dict): pt = usage.get('prompt_tokens') ct = usage.get('completion_tokens') tt = usage.get('total_tokens') else: return None try: prompt_tokens = int(pt) if pt is not None else 0 completion_tokens = int(ct) if ct is not None else 0 total_tokens = int(tt) if tt is not None else (prompt_tokens + completion_tokens) return { 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens, 'total_tokens': total_tokens or (prompt_tokens + completion_tokens), } except (TypeError, ValueError): return None def _get_agent_for_usage_stats(): """ Агент, которому приписать использование: по умолчанию тот, у кого is_default=True; если такого нет — первый активный (чтобы статистика обновлялась в любом случае). """ from .models import HomeworkAIAgent agent = HomeworkAIAgent.objects.filter(is_default=True, is_active=True).first() if agent: return agent return HomeworkAIAgent.objects.filter(is_active=True).order_by('order', 'name').first() def _increment_agent_usage(agent, usage: Optional[Dict[str, int]] = None): """Увеличить счётчик использований агента на 1 и при необходимости накопить токены.""" if not agent or not getattr(agent, 'pk', None): return try: from .models import HomeworkAIAgent # Сначала только usage_count, чтобы не падать при отсутствии полей токенов (до миграции) updated = HomeworkAIAgent.objects.filter(pk=agent.pk).update(usage_count=F('usage_count') + 1) if not updated: logger.warning("HomeworkAIAgent id=%s update usage_count matched 0 rows", agent.pk) return logger.info("HomeworkAIAgent id=%s usage_count += 1, tokens=%s", agent.pk, usage or 'n/a') # Токены — отдельным update (поля могут отсутствовать до миграции 0015) if usage and (usage.get('prompt_tokens') or usage.get('completion_tokens')): field_names = {f.name for f in HomeworkAIAgent._meta.get_fields()} if 'total_prompt_tokens' in field_names and 'total_completion_tokens' in field_names: HomeworkAIAgent.objects.filter(pk=agent.pk).update( total_prompt_tokens=F('total_prompt_tokens') + usage.get('prompt_tokens', 0), total_completion_tokens=F('total_completion_tokens') + usage.get('completion_tokens', 0), ) except Exception as e: logger.exception("Ошибка обновления usage_count для агента %s: %s", getattr(agent, 'pk'), e) class AICheckingService: """Сервис для автоматической проверки домашних заданий через ИИ (агенты из БД или OpenAI).""" def __init__(self): pass def check_submission( self, homework_title: str, homework_description: str, homework_max_score: int, submission_content: str, submission_files: list = None, homework_files: list = None, homework_file_paths: list = None, submission_file_paths: list = None, homework_file_contents: list = None, submission_file_contents: list = None, student_name: Optional[str] = None, ) -> Dict[str, any]: """ Проверка решения домашнего задания через AI. Отправляет: задание (текст + прикреплённые файлы/изображения если есть), решение (текст + прикреплённые файлы/изображения если есть). В ответ: комментарий и оценка 1–5. Args: homework_file_paths: Пути к файлам задания на диске (для чтения текста и изображений) submission_file_paths: Пути к файлам решения на диске homework_file_contents: [(имя_файла, содержимое_str_or_bytes), ...] — когда path недоступен (S3 и т.д.) submission_file_contents: [(имя_файла, содержимое_str_or_bytes), ...] student_name: Имя ученика — добавляется в начало промпта (например «Имя ученика: Кирилл»). """ agent = _get_default_agent() has_submission_files = bool(submission_file_paths or submission_file_contents or submission_files) if not submission_content and not submission_files and not has_submission_files: return { 'success': False, 'error': 'Решение пустое. Нет текста или файлов для проверки.' } homework_file_paths = homework_file_paths or [] submission_file_paths = submission_file_paths or [] homework_file_contents = homework_file_contents or [] submission_file_contents = submission_file_contents or [] # ─── Фаза 1: сначала полностью извлекаем текст из всех файлов, только потом отправляем в AI ─── logger.info( "AI check: фаза 1 — извлечение текста из файлов (homework paths=%s, contents=%s, submission paths=%s, contents=%s)", len(homework_file_paths), len(homework_file_contents), len(submission_file_paths), len(submission_file_contents), ) def _normalize_content(raw: Any, filename: str = "") -> str: if raw is None: return "" if isinstance(raw, bytes): # .py и код обычно в UTF-8; убираем BOM при наличии data = raw if data.startswith(b"\xef\xbb\xbf"): data = data[3:] for enc in ("utf-8", "cp1251", "latin-1"): try: return data.decode(enc) except UnicodeDecodeError: continue return data.decode("utf-8", errors="replace") return str(raw) def _content_to_text(filename: str, content: Any) -> str: """Для файлов из S3/contents: PDF из байтов извлекаем через pypdf+PyMuPDF, остальное (.py, .txt и т.д.) — как текст.""" if content is None: return "" is_pdf = (filename or "").lower().endswith(".pdf") or ( isinstance(content, bytes) and len(content) >= 5 and content[:5] == PDF_MAGIC ) if isinstance(content, bytes) and is_pdf: extracted = _extract_pdf_text_from_bytes(content) return (extracted or "").strip() return _normalize_content(content, filename) homework_file_texts: List[Tuple[str, str]] = [] for name, content in homework_file_contents: text = _content_to_text(name, content) if text.strip(): homework_file_texts.append((name, text.strip())) else: homework_file_texts.append((name, _EMPTY_OR_UNREADABLE_PLACEHOLDER)) for path in homework_file_paths: item = _read_file_content_for_ai(path) if item and not any(n == item[0] for n, _ in homework_file_texts): homework_file_texts.append(item) submission_file_texts: List[Tuple[str, str]] = [] for name, content in submission_file_contents: text = _content_to_text(name, content) if text.strip(): submission_file_texts.append((name, text.strip())) if (name or "").lower().endswith(".py"): logger.info("AI check: файл решения .py из contents «%s» — добавлен, %s символов", name, len(text)) else: submission_file_texts.append((name, _EMPTY_OR_UNREADABLE_PLACEHOLDER)) for path in submission_file_paths: item = _read_file_content_for_ai(path) if item and not any(n == item[0] for n, _ in submission_file_texts): submission_file_texts.append(item) name, text = item if (name or "").lower().endswith(".py") and text != _EMPTY_OR_UNREADABLE_PLACEHOLDER: logger.info("AI check: файл решения .py по пути «%s» — добавлен, %s символов", name, len(text)) # Фаза 1 завершена: извлечение из всех файлов выполнено. Дальше только формируем промпт и отправляем. logger.info( "AI check: фаза 1 завершена — задание: %s файлов (текст есть у %s), решение: %s файлов (текст есть у %s). Формируем промпт и отправляем в AI.", len(homework_file_texts), sum(1 for _, t in homework_file_texts if t != _EMPTY_OR_UNREADABLE_PLACEHOLDER), len(submission_file_texts), sum(1 for _, t in submission_file_texts if t != _EMPTY_OR_UNREADABLE_PLACEHOLDER), ) # ─── Фаза 2: извлечение завершено; проверяем unreadable, затем формируем промпт и только потом отправляем в AI ─── # Если не удалось извлечь текст из файлов — не вызываем AI, возвращаем черновик с сообщением homework_has_files = bool(homework_file_texts) homework_all_unreadable = homework_has_files and all( text == _EMPTY_OR_UNREADABLE_PLACEHOLDER for _, text in homework_file_texts ) submission_has_files = bool(submission_file_texts) submission_all_unreadable = submission_has_files and all( text == _EMPTY_OR_UNREADABLE_PLACEHOLDER for _, text in submission_file_texts ) if ( homework_has_files and homework_all_unreadable and not (homework_description or "").strip() and submission_has_files and submission_all_unreadable and not (submission_content or "").strip() ): logger.info("AI check: не удалось прочитать задание и решение, запрос к AI не отправляем") return { 'success': True, 'score': None, 'feedback': 'Не удалось прочитать задание и решение. Добавьте условие и решение в текстовом виде или в читаемых файлах (.txt, PDF с текстом, не сканы).', 'skipped_reason': 'unreadable_both', } if homework_has_files and homework_all_unreadable and not (homework_description or "").strip(): logger.info("AI check: не удалось прочитать задание (все файлы пустые/нечитаемые), запрос к AI не отправляем") return { 'success': True, 'score': None, 'feedback': 'Не удалось прочитать задание. Добавьте условие текстом в описание или приложите файл .txt или PDF с извлекаемым текстом (не скан).', 'skipped_reason': 'unreadable_assignment', } if submission_has_files and submission_all_unreadable and not (submission_content or "").strip(): logger.info("AI check: не удалось прочитать решение (все файлы пустые/нечитаемые), запрос к AI не отправляем") return { 'success': True, 'score': None, 'feedback': 'Не удалось прочитать решение. Попросите студента добавить текст решения в поле «Моё решение» или приложить файл .txt / PDF с извлекаемым текстом.', 'skipped_reason': 'unreadable_submission', } # Формируем промпт для AI (оценка 1–5). Режим разработки задаётся на агенте, по умолчанию выключен. dev_mode = getattr(agent, 'dev_mode', False) if agent else False # Имя ученика передаётся только в системный промпт (добавляется к вашему промпту из админки) prompt = self._build_prompt( homework_title=homework_title, homework_description=homework_description, homework_max_score=min(homework_max_score, 5), submission_content=submission_content, submission_files=submission_files or [], homework_files=homework_files or [], homework_file_texts=homework_file_texts, submission_file_texts=submission_file_texts, dev_mode=dev_mode, ) # Собираем изображения для multimodal: при наличии путей используем chat/completions с картинками image_data_urls: List[str] = [] for path in homework_file_paths: url = _read_image_as_data_url(path) if url: image_data_urls.append(url) for path in submission_file_paths: url = _read_image_as_data_url(path) if url: image_data_urls.append(url) has_images = bool(image_data_urls) # Вариант 2: всегда отправляем изображения как base64 data URL в chat/completions (multimodal) use_multimodal = has_images if has_images: logger.info( "AI check: загружаем %s изображений в запрос (base64 data URL, multimodal chat/completions)", len(image_data_urls), ) try: # OpenAI-совместимый API (chat/completions), в т.ч. multimodal (текст + изображения). RouterAI и др. client, model_name, used_agent = _get_client_and_model() if not client or not model_name: return { 'success': False, 'error': 'AI проверка недоступна. Добавьте ИИ-агента в админке (ДЗ → ИИ-агенты) или задайте OPENAI_API_KEY.' } # used_agent — агент, через которого реально пошёл запрос (при fallback на OPENAI_* он None) # Контент пользователя: текст или multimodal (текст + изображения) user_content: Any if use_multimodal and image_data_urls: user_content = [{'type': 'text', 'text': prompt}] for data_url in image_data_urls: user_content.append({'type': 'image_url', 'image_url': {'url': data_url}}) logger.info( "AI check (chat/completions multimodal): model=%s prompt_len=%s images=%s", model_name, len(prompt), len(image_data_urls) ) else: user_content = prompt logger.info( "AI check (chat/completions): model=%s prompt_len=%s prompt_preview=%s", model_name, len(prompt), (prompt[:400] + '...' if len(prompt) > 400 else prompt) ) # Системный промпт: из агента или встроенный по умолчанию default_system = ( 'Ты опытный преподаватель, который проверяет домашние задания студентов. ' 'Твоя задача — объективно оценить работу, указать на ошибки и дать конструктивную обратную связь. ' 'Оценка должна быть справедливой и мотивированной.' ) system_content = (getattr(used_agent, 'system_prompt', None) or '').strip() if used_agent else '' if not system_content: system_content = default_system dev_mode = getattr(used_agent, 'dev_mode', False) if used_agent else False if dev_mode: system_content += ( ' Режим отладки: в комментарии обязательно опиши содержимое каждого приложенного изображения ' '(что на фото/скриншоте/рисунке, какой текст виден), затем дай оценку и отзыв по заданию.' ) # Имя ученика (кто отправил ДЗ) в начало системного промпта display_name = (str(student_name).strip() if student_name else "") or "" if display_name: system_content = f"Имя ученика: {display_name}\n\n{system_content}" logger.info("AI check: в системный промпт добавлено имя ученика: %r", display_name[:80]) # Параметры генерации из агента (влияют на ответ модели) extra_kwargs = {} if used_agent: t = getattr(used_agent, 'temperature', None) if t is not None: extra_kwargs['temperature'] = float(t) p = getattr(used_agent, 'top_p', None) if p is not None: extra_kwargs['top_p'] = float(p) mt = getattr(used_agent, 'max_tokens', None) if mt is not None: extra_kwargs['max_tokens'] = int(mt) response = client.chat.completions.create( model=model_name, messages=[ {'role': 'system', 'content': system_content}, {'role': 'user', 'content': user_content} ], **extra_kwargs ) ai_response = response.choices[0].message.content.strip() usage = _normalize_usage(getattr(response, 'usage', None)) logger.info( "AI check (chat/completions) response: status=ok response_len=%s usage=%s", len(ai_response), usage ) max_for_parse = min(homework_max_score, 5) score, feedback = self._parse_ai_response(ai_response, max_for_parse) score = max(1, min(5, score)) agent_for_stats = _get_agent_for_usage_stats() _increment_agent_usage(agent_for_stats, usage) from .utils import feedback_to_html result = { 'success': True, 'score': score, 'feedback': feedback, 'feedback_html': feedback_to_html(feedback), 'raw_response': ai_response } if usage: result['usage'] = usage return result except requests.RequestException as e: resp = getattr(e, 'response', None) if resp is not None: try: err_body = resp.json() msg = err_body.get('message') or err_body.get('error', {}).get('message') or resp.text[:300] except Exception: msg = resp.text[:300] if getattr(resp, 'text', None) else str(e) if resp.status_code == 403: return { 'success': False, 'error': f'403 Доступ запрещён. RouterAI: проверьте ключ и баланс на routerai.ru/settings. {msg}' } else: msg = str(e) logger.error("Ошибка AI проверки (request): %s", msg, exc_info=True) return {'success': False, 'error': f'Ошибка при проверке через AI: {msg}'} except Exception as e: logger.error(f"Ошибка AI проверки: {str(e)}", exc_info=True) return { 'success': False, 'error': f'Ошибка при проверке через AI: {str(e)}' } def _build_prompt( self, homework_title: str, homework_description: str, homework_max_score: int, submission_content: str, submission_files: list = None, homework_files: list = None, homework_file_texts: list = None, submission_file_texts: list = None, dev_mode: bool = False, ) -> str: """Построение промпта для AI: задание, решение и содержимое приложенных файлов (PDF, .txt, .py и т.д.). Имя ученика добавляется в системный промпт при формировании запроса.""" homework_file_texts = homework_file_texts or [] submission_file_texts = submission_file_texts or [] parts = [ "ЗАДАНИЕ:", f"Название: {homework_title}", f"Описание: {homework_description or 'Нет текста.'}", ] for name, content in homework_file_texts: parts.append(f"\n--- Файл задания «{name}» ---\n{content}") parts.append("\nРЕШЕНИЕ СТУДЕНТА:") parts.append(submission_content or "Нет текста.") for name, content in submission_file_texts: parts.append(f"\n--- Файл решения «{name}» ---\n{content}") parts.append( "\n\n--- ИНСТРУКЦИЯ ДЛЯ ОТВЕТА ---\n" "Ответь строго в два поля.\n" "1) ОЦЕНКА: [целое число от 1 до 5].\n" "2) КОММЕНТАРИЙ: [развёрнутый комментарий для студента на русском]. " "Пиши полноценный отзыв: что сделано хорошо, что улучшить, конкретные замечания по коду или решению. " "Не используй краткие формулировки вроде «Совпало: да», «Верно: да» — только развёрнутый текст в поле КОММЕНТАРИЙ." ) if dev_mode: parts.append( "\n\n[РЕЖИМ ОТЛАДКИ] В комментарии обязательно опиши, что изображено на каждом приложенном изображении (фото, скриншот, рисунок): что на них видно, текст если есть, структура. Начни с описания изображений, затем дай оценку и общий комментарий по заданию." ) return "\n".join(parts) def _parse_ai_response(self, response: str, max_score: int) -> Tuple[int, str]: """ Парсинг ответа AI. Returns: tuple: (score, feedback) """ try: # Ищем оценку score = None feedback = "" lines = response.split('\n') in_comment = False for line in lines: line = line.strip() # Ищем оценку if line.startswith('ОЦЕНКА:') or 'ОЦЕНКА:' in line: try: # Извлекаем число parts = line.split('ОЦЕНКА:') if len(parts) > 1: score_str = parts[1].strip().split()[0] score = int(score_str) except (ValueError, IndexError): pass # Ищем комментарий if line.startswith('КОММЕНТАРИЙ:') or 'КОММЕНТАРИЙ:' in line: in_comment = True # Извлекаем текст после "КОММЕНТАРИЙ:" parts = line.split('КОММЕНТАРИЙ:') if len(parts) > 1: feedback += parts[1].strip() + '\n' continue if in_comment: feedback += line + '\n' # Если не нашли оценку, пытаемся найти число в начале ответа if score is None: for line in lines[:5]: # Проверяем первые 5 строк try: # Ищем первое число words = line.split() for word in words: if word.isdigit(): score = int(word) if 0 <= score <= max_score: break if score is not None: break except: pass # Если все еще не нашли, используем среднее значение (3 по шкале 1–5) if score is None: score = 3 logger.warning("Не удалось извлечь оценку из ответа AI. Используется 3.") # Ограничиваем оценку 1–5 score = max(1, min(score, min(max_score, 5))) # Если не нашли комментарий, используем весь ответ if not feedback.strip(): feedback = response return score, feedback.strip() except Exception as e: logger.error(f"Ошибка парсинга ответа AI: {str(e)}") # Возвращаем среднее значение и весь ответ как комментарий return max_score // 2, response # Глобальный экземпляр сервиса _ai_service = None def get_ai_service() -> AICheckingService: """Получить экземпляр сервиса AI проверки.""" global _ai_service if _ai_service is None: _ai_service = AICheckingService() return _ai_service