uchill/backend/apps/homework/ai_service.py

933 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Сервис для AI проверки домашних заданий.
Поддерживает ИИ-агенты из БД (RouterAI и др. OpenAI-совместимые)
и fallback на OPENAI_API_KEY из настроек.
RouterAI: https://routerai.ru/docs/reference
OpenAI-совместимый: POST https://routerai.ru/api/v1/chat/completions
Модели: openai/gpt-4o-mini, anthropic/claude-3-5-sonnet и др. — см. https://routerai.ru/models
Авторизация: Bearer <API-ключ из https://routerai.ru/settings/keys>
Поддерживает multimodal (текст + base64 изображения).
"""
import base64
import io
import logging
import os
import subprocess
import tempfile
import requests
from django.conf import settings
from django.db.models import F
from typing import Dict, List, Optional, Tuple, Any
logger = logging.getLogger(__name__)
# Максимальный размер одного изображения для отправки в AI (байты)
MAX_IMAGE_SIZE = 8 * 1024 * 1024 # 8 МБ
# Расширения изображений, которые отправляем в запрос
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'}
# Расширения текстовых файлов: читаем содержимое и подставляем в промпт (задание/решение)
TEXT_EXTENSIONS = {'.txt', '.py', '.md', '.json', '.csv', '.xml', '.html', '.css', '.js', '.ts', '.jsx', '.tsx', '.c', '.cpp', '.h', '.java', '.kt', '.rs', '.go', '.rb', '.php', '.sql', '.yaml', '.yml', '.ini', '.cfg', '.log'}
# Максимальный размер текстового файла для чтения (байты)
MAX_TEXT_FILE_SIZE = 25 * 1024 * 1024 # 2 МБ
# Максимум символов из одного файла в промпт (чтобы не перегружать токены)
MAX_CHARS_PER_FILE = 800_000
def _read_file_text(file_path: str) -> Optional[str]:
"""
Читает текстовый файл (.txt, .py, .md и т.д.) и возвращает содержимое.
Кодировки: utf-8, затем cp1251, latin-1. Ограничение размера и длины — MAX_TEXT_FILE_SIZE, MAX_CHARS_PER_FILE.
"""
if not file_path or not os.path.isfile(file_path):
return None
ext = os.path.splitext(file_path)[1].lower()
if ext not in TEXT_EXTENSIONS:
return None
try:
size = os.path.getsize(file_path)
if size > MAX_TEXT_FILE_SIZE:
logger.warning("AI check: пропуск текстового файла (слишком большой %s bytes): %s", size, file_path)
return None
for encoding in ('utf-8', 'cp1251', 'latin-1'):
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read(MAX_CHARS_PER_FILE + 1)
if len(text) > MAX_CHARS_PER_FILE:
text = text[:MAX_CHARS_PER_FILE] + "\n\n[... файл обрезан ...]"
return text
except UnicodeDecodeError:
continue
return None
except Exception as e:
logger.warning("AI check: не удалось прочитать текстовый файл %s: %s", file_path, e)
return None
def _extract_pdf_text_pymupdf(data: bytes) -> Optional[str]:
"""Fallback: извлечение текста из PDF через PyMuPDF (часто справляется там, где pypdf возвращает пусто)."""
if not data or len(data) > MAX_TEXT_FILE_SIZE:
return None
try:
import pymupdf
doc = pymupdf.open(stream=data, filetype="pdf")
parts = []
total = 0
for page in doc:
if total >= MAX_CHARS_PER_FILE:
parts.append("\n[... PDF обрезан ...]")
break
text = page.get_text() or ""
if total + len(text) > MAX_CHARS_PER_FILE:
text = text[: MAX_CHARS_PER_FILE - total]
parts.append(text)
total += len(text)
doc.close()
return "\n".join(parts).strip() or None
except ImportError:
logger.debug("AI check: pymupdf не установлен, fallback для PDF недоступен")
return None
except Exception as e:
logger.warning("AI check: PyMuPDF не смог извлечь текст из PDF: %s", e)
return None
def _extract_pdf_text_pdfminer(data: bytes) -> Optional[str]:
"""Извлечение текста из PDF через pdfminer.six (часто справляется с кодировками и структурой)."""
if not data or len(data) > MAX_TEXT_FILE_SIZE:
return None
try:
from pdfminer.high_level import extract_text
from pdfminer.layout import LAParams
text = extract_text(io.BytesIO(data), laparams=LAParams(line_margin=0.5, word_margin=0.1))
if not text or not text.strip():
return None
if len(text) > MAX_CHARS_PER_FILE:
text = text[:MAX_CHARS_PER_FILE] + "\n\n[... PDF обрезан ...]"
return text.strip()
except ImportError:
logger.debug("AI check: pdfminer.six не установлен")
return None
except Exception as e:
logger.warning("AI check: pdfminer исключение: %s", e)
return None
def _write_text_to_txt_and_read(text: str) -> Optional[str]:
"""Пишем текст во временный .txt и читаем обратно — результат «из файла»."""
if not text or not text.strip():
return None
try:
fd, path = tempfile.mkstemp(suffix=".txt", prefix="pdf2txt_", text=True)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text[:MAX_CHARS_PER_FILE])
if len(text) > MAX_CHARS_PER_FILE:
f.write("\n\n[... обрезано ...]")
with open(path, "r", encoding="utf-8") as f:
return f.read().strip() or None
finally:
try:
os.unlink(path)
except OSError:
pass
except Exception as e:
logger.warning("AI check: не удалось записать/прочитать .txt: %s", e)
return text.strip()
def _convert_pdf_to_txt_and_read(pdf_path: Optional[str] = None, pdf_bytes: Optional[bytes] = None) -> Optional[str]:
"""
Получаем PDF → сразу конвертируем в .txt файл → читаем текст из .txt.
Так текст всегда получаем из файла. Порядок: pdftotext (если есть) → pypdf → PyMuPDF → pdfminer.
"""
if pdf_path and not os.path.isfile(pdf_path):
pdf_path = None
if not pdf_path and not pdf_bytes:
return None
if pdf_bytes:
if len(pdf_bytes) > MAX_TEXT_FILE_SIZE:
return None
if len(pdf_bytes) >= 5 and pdf_bytes[:5] != b"%PDF-":
logger.warning("AI check: байты не похожи на PDF, len=%s", len(pdf_bytes))
return None
# Если только байты — пишем во временный PDF, дальше работаем с путём
temp_pdf_path = None
if not pdf_path and pdf_bytes:
try:
fd, temp_pdf_path = tempfile.mkstemp(suffix=".pdf")
os.close(fd)
with open(temp_pdf_path, "wb") as f:
f.write(pdf_bytes)
pdf_path = temp_pdf_path
except Exception as e:
logger.warning("AI check: не удалось записать PDF во временный файл: %s", e)
return None
txt_path = None
try:
# 1) pdftotext (poppler) — часто надёжнее библиотек
try:
fd, txt_path = tempfile.mkstemp(suffix=".txt", prefix="pdf2txt_", text=True)
os.close(fd)
r = subprocess.run(
["pdftotext", "-enc", "UTF-8", pdf_path, txt_path],
capture_output=True,
timeout=60,
cwd=os.path.dirname(pdf_path) or None,
)
if r.returncode == 0 and os.path.isfile(txt_path):
with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
text = f.read(MAX_CHARS_PER_FILE + 1)
if len(text) > MAX_CHARS_PER_FILE:
text = text[:MAX_CHARS_PER_FILE] + "\n\n[... обрезано ...]"
if text.strip():
logger.info("AI check: pdftotext сконвертировал PDF в .txt, прочитано %s символов", len(text))
return text.strip()
logger.info("AI check: pdftotext вернул код 0, но .txt пустой (возможно скан без текстового слоя)")
else:
stderr = (r.stderr or b"").decode("utf-8", errors="replace").strip() if r.stderr else ""
logger.warning("AI check: pdftotext не сработал returncode=%s stderr=%s", r.returncode, stderr[:500])
except FileNotFoundError:
logger.warning("AI check: pdftotext не найден в PATH (в Docker установите poppler-utils)")
except Exception as e:
logger.warning("AI check: pdftotext исключение: %s", e)
finally:
if txt_path and os.path.isfile(txt_path):
try:
os.unlink(txt_path)
except OSError:
pass
txt_path = None
# 2) Извлечение библиотеками и запись в .txt — пробуем все по очереди
with open(pdf_path, "rb") as f:
data = f.read()
logger.info("AI check: PDF прочитан с диска, %s bytes, пробуем извлечь текст", len(data))
raw_text = None
# pdfminer часто лучше для PDF с кириллицей и сложной вёрсткой
raw_text = _extract_pdf_text_pdfminer(data)
if raw_text and raw_text.strip():
logger.info("AI check: pdfminer извлёк %s символов", len(raw_text))
else:
logger.info("AI check: pdfminer вернул пустой текст")
if not (raw_text and raw_text.strip()):
raw_text = _extract_pdf_text_pymupdf(data)
if raw_text and raw_text.strip():
logger.info("AI check: PyMuPDF извлёк %s символов", len(raw_text))
else:
logger.info("AI check: PyMuPDF вернул пустой текст")
if not (raw_text and raw_text.strip()):
try:
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(data))
parts = []
total = 0
for page in reader.pages:
if total >= MAX_CHARS_PER_FILE:
break
t = page.extract_text() or ""
if total + len(t) > MAX_CHARS_PER_FILE:
t = t[: MAX_CHARS_PER_FILE - total]
parts.append(t)
total += len(t)
raw_text = "\n".join(parts).strip() or None
if raw_text:
logger.info("AI check: pypdf извлёк %s символов", len(raw_text))
else:
logger.info("AI check: pypdf вернул пустой текст (страниц: %s)", len(reader.pages))
except Exception as e:
logger.warning("AI check: pypdf исключение: %s", e)
if not (raw_text and raw_text.strip()):
logger.warning(
"AI check: ни один метод не извлек текст. Возможные причины: PDF — скан (нет текстового слоя), защищён, или нестандартная кодировка. Путь: %s",
pdf_path,
)
return None
# Конверт: пишем в .txt и читаем из файла
return _write_text_to_txt_and_read(raw_text)
finally:
if temp_pdf_path and os.path.isfile(temp_pdf_path):
try:
os.unlink(temp_pdf_path)
except OSError:
pass
def _resolve_media_path(file_path: str) -> Optional[str]:
"""Проверяем путь к файлу; если не найден и путь относительный — пробуем MEDIA_ROOT (для .py, .txt, PDF и т.д.)."""
if not file_path:
return None
if os.path.isfile(file_path):
return file_path
try:
media = getattr(settings, "MEDIA_ROOT", None) or ""
if not media or os.path.isabs(file_path):
return None
base = file_path.lstrip("/")
for candidate in (
os.path.normpath(os.path.join(media, base)),
os.path.normpath(os.path.join(media, os.path.basename(file_path))),
):
if os.path.isfile(candidate):
return candidate
except Exception:
pass
return None
def _resolve_pdf_path(file_path: str) -> Optional[str]:
"""Проверяем путь к PDF; если относительный и файл не найден — пробуем MEDIA_ROOT."""
return _resolve_media_path(file_path)
def _read_pdf_text(file_path: str) -> Optional[str]:
"""PDF по пути → конверт в .txt → читаем текст из .txt. Всегда пробуем извлечь текст."""
resolved = _resolve_pdf_path(file_path)
if not resolved:
logger.warning("AI check: PDF файл не найден: %s", file_path)
return None
file_path = resolved
if os.path.splitext(file_path)[1].lower() != ".pdf":
return None
try:
size = os.path.getsize(file_path)
logger.info("AI check: читаем PDF path=%s size=%s", file_path, size)
if size > MAX_TEXT_FILE_SIZE:
logger.warning("AI check: пропуск PDF (слишком большой): %s", file_path)
return None
return _convert_pdf_to_txt_and_read(pdf_path=file_path)
except Exception as e:
logger.warning("AI check: не удалось прочитать PDF %s: %s", file_path, e)
return None
# Magic bytes для определения PDF по содержимому (если имя файла без расширения)
PDF_MAGIC = b"%PDF-"
def _extract_pdf_text_from_bytes(data: bytes) -> Optional[str]:
"""PDF из байтов → конверт в .txt → читаем текст из .txt."""
if not data or len(data) > MAX_TEXT_FILE_SIZE:
return None
if len(data) >= 5 and data[:5] != PDF_MAGIC:
logger.warning("AI check: байты не похожи на PDF, len=%s", len(data))
return None
logger.info("AI check: конвертируем PDF в .txt, размер %s bytes", len(data))
return _convert_pdf_to_txt_and_read(pdf_bytes=data)
# Подпись для пустых или нечитаемых файлов, чтобы AI видел, что файл приложен, и мог попросить добавить код
_EMPTY_OR_UNREADABLE_PLACEHOLDER = "(файл пустой или не удалось прочитать содержимое — попроси студента добавить код/текст в файл)"
def _read_file_content_for_ai(file_path: str) -> Optional[Tuple[str, str]]:
"""
Возвращает (имя_файла, содержимое) для вставки в промпт.
Если файл пустой или не удалось прочитать — возвращает (имя, подпись), чтобы AI видел приложенный файл.
Изображения не возвращаются (идут в multimodal).
"""
# Разрешаем путь через MEDIA_ROOT при необходимости (PDF, .py, .txt и т.д.)
if file_path:
file_path = _resolve_media_path(file_path) or file_path
if not file_path or not os.path.isfile(file_path):
return None
ext = os.path.splitext(file_path)[1].lower()
name = os.path.basename(file_path)
if ext in IMAGE_EXTENSIONS:
return None # изображения обрабатываются отдельно
if ext == '.pdf':
content = _read_pdf_text(file_path)
elif ext in TEXT_EXTENSIONS:
content = _read_file_text(file_path)
else:
content = _read_file_text(file_path) # попробовать как текст
if content and content.strip():
return (name, content.strip())
# Файл пустой или не удалось прочитать — всё равно добавляем в промпт с подписью
return (name, _EMPTY_OR_UNREADABLE_PLACEHOLDER)
def _read_image_as_data_url(file_path: str) -> Optional[str]:
"""
Читает файл с диска и возвращает data URL (data:image/...;base64,...) для изображений.
Возвращает None, если файл не изображение, слишком большой или ошибка чтения.
"""
if not file_path or not os.path.isfile(file_path):
return None
ext = os.path.splitext(file_path)[1].lower()
if ext not in IMAGE_EXTENSIONS:
return None
try:
size = os.path.getsize(file_path)
if size > MAX_IMAGE_SIZE:
logger.warning("AI check: пропуск изображения (слишком большой %s bytes): %s", size, file_path)
return None
with open(file_path, 'rb') as f:
data = f.read()
b64 = base64.b64encode(data).decode('ascii')
mime = 'image/jpeg' if ext in ('.jpg', '.jpeg') else f'image/{ext[1:]}' # png, gif, webp, bmp
return f"data:{mime};base64,{b64}"
except Exception as e:
logger.warning("AI check: не удалось прочитать изображение %s: %s", file_path, e)
return None
def _get_default_agent():
"""Получить агент по умолчанию для проверки ДЗ (из БД)."""
try:
from .models import HomeworkAIAgent
return HomeworkAIAgent.objects.filter(is_default=True, is_active=True).first()
except Exception:
return None
def _get_client_and_model():
"""
Возвращает (client, model_name, agent) для проверки ДЗ.
agent может быть None при fallback на OPENAI_*.
Лимиты и параметры генерации задаются у провайдера (RouterAI и др.); в запросе передаём только промпт (messages).
Приоритет: агент из БД (is_default=True) → настройки OPENAI_*.
"""
agent = _get_default_agent()
if agent:
api_key = (agent.api_key or '').strip() or getattr(settings, 'HOMEWORK_AI_API_KEY', None) or getattr(settings, 'OPENAI_API_KEY', None)
if not api_key:
logger.warning("ИИ-агент '%s' выбран, но API ключ не задан (агент.api_key или HOMEWORK_AI_API_KEY).", agent.name)
return None, None, None
try:
from openai import OpenAI
base_url = agent.get_base_url()
if not base_url:
logger.warning("ИИ-агент '%s': openai_url пустой.", agent.name)
return None, None, None
auth_header = getattr(agent, 'auth_header', None) or 'Bearer'
client_kwargs = {'base_url': base_url, 'api_key': api_key}
if auth_header == 'X-API-Key':
client_kwargs['default_headers'] = {'X-API-Key': api_key}
client = OpenAI(**client_kwargs)
return client, agent.model_name, agent
except ImportError:
logger.error("OpenAI библиотека не установлена. Установите: pip install openai")
return None, None, None
# Fallback: настройки OpenAI
api_key = getattr(settings, 'OPENAI_API_KEY', None)
model = getattr(settings, 'OPENAI_MODEL', 'gpt-4o-mini')
if not api_key:
logger.warning("OPENAI_API_KEY не установлен. AI проверка недоступна.")
return None, None, None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
return client, model, None
except ImportError:
logger.error("OpenAI библиотека не установлена. Установите: pip install openai")
return None, None, None
def _normalize_usage(data: Any) -> Optional[Dict[str, int]]:
"""
Извлечь usage (prompt_tokens, completion_tokens, total_tokens) из ответа API.
Поддерживает OpenAI-формат (usage.prompt_tokens и т.д.) и варианты (usage.total_tokens, tokens).
"""
if not data:
return None
usage = None
if isinstance(data, dict):
usage = data.get('usage')
elif hasattr(data, 'usage'):
usage = getattr(data, 'usage', None)
if usage is None:
return None
if hasattr(usage, 'prompt_tokens'):
pt = getattr(usage, 'prompt_tokens', None)
ct = getattr(usage, 'completion_tokens', None)
tt = getattr(usage, 'total_tokens', None)
elif isinstance(usage, dict):
pt = usage.get('prompt_tokens')
ct = usage.get('completion_tokens')
tt = usage.get('total_tokens')
else:
return None
try:
prompt_tokens = int(pt) if pt is not None else 0
completion_tokens = int(ct) if ct is not None else 0
total_tokens = int(tt) if tt is not None else (prompt_tokens + completion_tokens)
return {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': total_tokens or (prompt_tokens + completion_tokens),
}
except (TypeError, ValueError):
return None
def _get_agent_for_usage_stats():
"""
Агент, которому приписать использование: по умолчанию тот, у кого is_default=True;
если такого нет — первый активный (чтобы статистика обновлялась в любом случае).
"""
from .models import HomeworkAIAgent
agent = HomeworkAIAgent.objects.filter(is_default=True, is_active=True).first()
if agent:
return agent
return HomeworkAIAgent.objects.filter(is_active=True).order_by('order', 'name').first()
def _increment_agent_usage(agent, usage: Optional[Dict[str, int]] = None):
"""Увеличить счётчик использований агента на 1 и при необходимости накопить токены."""
if not agent or not getattr(agent, 'pk', None):
return
try:
from .models import HomeworkAIAgent
# Сначала только usage_count, чтобы не падать при отсутствии полей токенов (до миграции)
updated = HomeworkAIAgent.objects.filter(pk=agent.pk).update(usage_count=F('usage_count') + 1)
if not updated:
logger.warning("HomeworkAIAgent id=%s update usage_count matched 0 rows", agent.pk)
return
logger.info("HomeworkAIAgent id=%s usage_count += 1, tokens=%s", agent.pk, usage or 'n/a')
# Токены — отдельным update (поля могут отсутствовать до миграции 0015)
if usage and (usage.get('prompt_tokens') or usage.get('completion_tokens')):
field_names = {f.name for f in HomeworkAIAgent._meta.get_fields()}
if 'total_prompt_tokens' in field_names and 'total_completion_tokens' in field_names:
HomeworkAIAgent.objects.filter(pk=agent.pk).update(
total_prompt_tokens=F('total_prompt_tokens') + usage.get('prompt_tokens', 0),
total_completion_tokens=F('total_completion_tokens') + usage.get('completion_tokens', 0),
)
except Exception as e:
logger.exception("Ошибка обновления usage_count для агента %s: %s", getattr(agent, 'pk'), e)
class AICheckingService:
"""Сервис для автоматической проверки домашних заданий через ИИ (агенты из БД или OpenAI)."""
def __init__(self):
pass
def check_submission(
self,
homework_title: str,
homework_description: str,
homework_max_score: int,
submission_content: str,
submission_files: list = None,
homework_files: list = None,
homework_file_paths: list = None,
submission_file_paths: list = None,
homework_file_contents: list = None,
submission_file_contents: list = None,
student_name: Optional[str] = None,
) -> Dict[str, any]:
"""
Проверка решения домашнего задания через AI.
Отправляет: задание (текст + прикреплённые файлы/изображения если есть),
решение (текст + прикреплённые файлы/изображения если есть).
В ответ: комментарий и оценка 15.
Args:
homework_file_paths: Пути к файлам задания на диске (для чтения текста и изображений)
submission_file_paths: Пути к файлам решения на диске
homework_file_contents: [(имя_файла, содержимое_str_or_bytes), ...] — когда path недоступен (S3 и т.д.)
submission_file_contents: [(имя_файла, содержимое_str_or_bytes), ...]
student_name: Имя ученика — добавляется в начало промпта (например «Имя ученика: Кирилл»).
"""
agent = _get_default_agent()
has_submission_files = bool(submission_file_paths or submission_file_contents or submission_files)
if not submission_content and not submission_files and not has_submission_files:
return {
'success': False,
'error': 'Решение пустое. Нет текста или файлов для проверки.'
}
homework_file_paths = homework_file_paths or []
submission_file_paths = submission_file_paths or []
homework_file_contents = homework_file_contents or []
submission_file_contents = submission_file_contents or []
# ─── Фаза 1: сначала полностью извлекаем текст из всех файлов, только потом отправляем в AI ───
logger.info(
"AI check: фаза 1 — извлечение текста из файлов (homework paths=%s, contents=%s, submission paths=%s, contents=%s)",
len(homework_file_paths), len(homework_file_contents), len(submission_file_paths), len(submission_file_contents),
)
def _normalize_content(raw: Any, filename: str = "") -> str:
if raw is None:
return ""
if isinstance(raw, bytes):
# .py и код обычно в UTF-8; убираем BOM при наличии
data = raw
if data.startswith(b"\xef\xbb\xbf"):
data = data[3:]
for enc in ("utf-8", "cp1251", "latin-1"):
try:
return data.decode(enc)
except UnicodeDecodeError:
continue
return data.decode("utf-8", errors="replace")
return str(raw)
def _content_to_text(filename: str, content: Any) -> str:
"""Для файлов из S3/contents: PDF из байтов извлекаем через pypdf+PyMuPDF, остальное (.py, .txt и т.д.) — как текст."""
if content is None:
return ""
is_pdf = (filename or "").lower().endswith(".pdf") or (
isinstance(content, bytes) and len(content) >= 5 and content[:5] == PDF_MAGIC
)
if isinstance(content, bytes) and is_pdf:
extracted = _extract_pdf_text_from_bytes(content)
return (extracted or "").strip()
return _normalize_content(content, filename)
homework_file_texts: List[Tuple[str, str]] = []
for name, content in homework_file_contents:
text = _content_to_text(name, content)
if text.strip():
homework_file_texts.append((name, text.strip()))
else:
homework_file_texts.append((name, _EMPTY_OR_UNREADABLE_PLACEHOLDER))
for path in homework_file_paths:
item = _read_file_content_for_ai(path)
if item and not any(n == item[0] for n, _ in homework_file_texts):
homework_file_texts.append(item)
submission_file_texts: List[Tuple[str, str]] = []
for name, content in submission_file_contents:
text = _content_to_text(name, content)
if text.strip():
submission_file_texts.append((name, text.strip()))
if (name or "").lower().endswith(".py"):
logger.info("AI check: файл решения .py из contents «%s» — добавлен, %s символов", name, len(text))
else:
submission_file_texts.append((name, _EMPTY_OR_UNREADABLE_PLACEHOLDER))
for path in submission_file_paths:
item = _read_file_content_for_ai(path)
if item and not any(n == item[0] for n, _ in submission_file_texts):
submission_file_texts.append(item)
name, text = item
if (name or "").lower().endswith(".py") and text != _EMPTY_OR_UNREADABLE_PLACEHOLDER:
logger.info("AI check: файл решения .py по пути «%s» — добавлен, %s символов", name, len(text))
# Фаза 1 завершена: извлечение из всех файлов выполнено. Дальше только формируем промпт и отправляем.
logger.info(
"AI check: фаза 1 завершена — задание: %s файлов (текст есть у %s), решение: %s файлов (текст есть у %s). Формируем промпт и отправляем в AI.",
len(homework_file_texts),
sum(1 for _, t in homework_file_texts if t != _EMPTY_OR_UNREADABLE_PLACEHOLDER),
len(submission_file_texts),
sum(1 for _, t in submission_file_texts if t != _EMPTY_OR_UNREADABLE_PLACEHOLDER),
)
# ─── Фаза 2: извлечение завершено; проверяем unreadable, затем формируем промпт и только потом отправляем в AI ───
# Если не удалось извлечь текст из файлов — не вызываем AI, возвращаем черновик с сообщением
homework_has_files = bool(homework_file_texts)
homework_all_unreadable = homework_has_files and all(
text == _EMPTY_OR_UNREADABLE_PLACEHOLDER for _, text in homework_file_texts
)
submission_has_files = bool(submission_file_texts)
submission_all_unreadable = submission_has_files and all(
text == _EMPTY_OR_UNREADABLE_PLACEHOLDER for _, text in submission_file_texts
)
if (
homework_has_files and homework_all_unreadable and not (homework_description or "").strip()
and submission_has_files and submission_all_unreadable and not (submission_content or "").strip()
):
logger.info("AI check: не удалось прочитать задание и решение, запрос к AI не отправляем")
return {
'success': True,
'score': None,
'feedback': 'Не удалось прочитать задание и решение. Добавьте условие и решение в текстовом виде или в читаемых файлах (.txt, PDF с текстом, не сканы).',
'skipped_reason': 'unreadable_both',
}
if homework_has_files and homework_all_unreadable and not (homework_description or "").strip():
logger.info("AI check: не удалось прочитать задание (все файлы пустые/нечитаемые), запрос к AI не отправляем")
return {
'success': True,
'score': None,
'feedback': 'Не удалось прочитать задание. Добавьте условие текстом в описание или приложите файл .txt или PDF с извлекаемым текстом (не скан).',
'skipped_reason': 'unreadable_assignment',
}
if submission_has_files and submission_all_unreadable and not (submission_content or "").strip():
logger.info("AI check: не удалось прочитать решение (все файлы пустые/нечитаемые), запрос к AI не отправляем")
return {
'success': True,
'score': None,
'feedback': 'Не удалось прочитать решение. Попросите студента добавить текст решения в поле «Моё решение» или приложить файл .txt / PDF с извлекаемым текстом.',
'skipped_reason': 'unreadable_submission',
}
# Формируем промпт для AI (оценка 15). Режим разработки задаётся на агенте, по умолчанию выключен.
dev_mode = getattr(agent, 'dev_mode', False) if agent else False
# Имя ученика передаётся только в системный промпт (добавляется к вашему промпту из админки)
prompt = self._build_prompt(
homework_title=homework_title,
homework_description=homework_description,
homework_max_score=min(homework_max_score, 5),
submission_content=submission_content,
submission_files=submission_files or [],
homework_files=homework_files or [],
homework_file_texts=homework_file_texts,
submission_file_texts=submission_file_texts,
dev_mode=dev_mode,
)
# Собираем изображения для multimodal: при наличии путей используем chat/completions с картинками
image_data_urls: List[str] = []
for path in homework_file_paths:
url = _read_image_as_data_url(path)
if url:
image_data_urls.append(url)
for path in submission_file_paths:
url = _read_image_as_data_url(path)
if url:
image_data_urls.append(url)
has_images = bool(image_data_urls)
# Вариант 2: всегда отправляем изображения как base64 data URL в chat/completions (multimodal)
use_multimodal = has_images
if has_images:
logger.info(
"AI check: загружаем %s изображений в запрос (base64 data URL, multimodal chat/completions)",
len(image_data_urls),
)
try:
# OpenAI-совместимый API (chat/completions), в т.ч. multimodal (текст + изображения). RouterAI и др.
client, model_name, used_agent = _get_client_and_model()
if not client or not model_name:
return {
'success': False,
'error': 'AI проверка недоступна. Добавьте ИИ-агента в админке (ДЗ → ИИ-агенты) или задайте OPENAI_API_KEY.'
}
# used_agent — агент, через которого реально пошёл запрос (при fallback на OPENAI_* он None)
# Контент пользователя: текст или multimodal (текст + изображения)
user_content: Any
if use_multimodal and image_data_urls:
user_content = [{'type': 'text', 'text': prompt}]
for data_url in image_data_urls:
user_content.append({'type': 'image_url', 'image_url': {'url': data_url}})
logger.info(
"AI check (chat/completions multimodal): model=%s prompt_len=%s images=%s",
model_name, len(prompt), len(image_data_urls)
)
else:
user_content = prompt
logger.info(
"AI check (chat/completions): model=%s prompt_len=%s prompt_preview=%s",
model_name, len(prompt), (prompt[:400] + '...' if len(prompt) > 400 else prompt)
)
# Системный промпт: из агента или встроенный по умолчанию
default_system = (
'Ты опытный преподаватель, который проверяет домашние задания студентов. '
'Твоя задача — объективно оценить работу, указать на ошибки и дать конструктивную обратную связь. '
'Оценка должна быть справедливой и мотивированной.'
)
system_content = (getattr(used_agent, 'system_prompt', None) or '').strip() if used_agent else ''
if not system_content:
system_content = default_system
dev_mode = getattr(used_agent, 'dev_mode', False) if used_agent else False
if dev_mode:
system_content += (
' Режим отладки: в комментарии обязательно опиши содержимое каждого приложенного изображения '
'(что на фото/скриншоте/рисунке, какой текст виден), затем дай оценку и отзыв по заданию.'
)
# Имя ученика (кто отправил ДЗ) в начало системного промпта
display_name = (str(student_name).strip() if student_name else "") or ""
if display_name:
system_content = f"Имя ученика: {display_name}\n\n{system_content}"
logger.info("AI check: в системный промпт добавлено имя ученика: %r", display_name[:80])
# Параметры генерации из агента (влияют на ответ модели)
extra_kwargs = {}
if used_agent:
t = getattr(used_agent, 'temperature', None)
if t is not None:
extra_kwargs['temperature'] = float(t)
p = getattr(used_agent, 'top_p', None)
if p is not None:
extra_kwargs['top_p'] = float(p)
mt = getattr(used_agent, 'max_tokens', None)
if mt is not None:
extra_kwargs['max_tokens'] = int(mt)
response = client.chat.completions.create(
model=model_name,
messages=[
{'role': 'system', 'content': system_content},
{'role': 'user', 'content': user_content}
],
**extra_kwargs
)
ai_response = response.choices[0].message.content.strip()
usage = _normalize_usage(getattr(response, 'usage', None))
logger.info(
"AI check (chat/completions) response: status=ok response_len=%s usage=%s",
len(ai_response), usage
)
max_for_parse = min(homework_max_score, 5)
score, feedback = self._parse_ai_response(ai_response, max_for_parse)
score = max(1, min(5, score))
agent_for_stats = _get_agent_for_usage_stats()
_increment_agent_usage(agent_for_stats, usage)
from .utils import feedback_to_html
result = {
'success': True,
'score': score,
'feedback': feedback,
'feedback_html': feedback_to_html(feedback),
'raw_response': ai_response
}
if usage:
result['usage'] = usage
return result
except requests.RequestException as e:
resp = getattr(e, 'response', None)
if resp is not None:
try:
err_body = resp.json()
msg = err_body.get('message') or err_body.get('error', {}).get('message') or resp.text[:300]
except Exception:
msg = resp.text[:300] if getattr(resp, 'text', None) else str(e)
if resp.status_code == 403:
return {
'success': False,
'error': f'403 Доступ запрещён. RouterAI: проверьте ключ и баланс на routerai.ru/settings. {msg}'
}
else:
msg = str(e)
logger.error("Ошибка AI проверки (request): %s", msg, exc_info=True)
return {'success': False, 'error': f'Ошибка при проверке через AI: {msg}'}
except Exception as e:
logger.error(f"Ошибка AI проверки: {str(e)}", exc_info=True)
return {
'success': False,
'error': f'Ошибка при проверке через AI: {str(e)}'
}
def _build_prompt(
self,
homework_title: str,
homework_description: str,
homework_max_score: int,
submission_content: str,
submission_files: list = None,
homework_files: list = None,
homework_file_texts: list = None,
submission_file_texts: list = None,
dev_mode: bool = False,
) -> str:
"""Построение промпта для AI: задание, решение и содержимое приложенных файлов (PDF, .txt, .py и т.д.). Имя ученика добавляется в системный промпт при формировании запроса."""
homework_file_texts = homework_file_texts or []
submission_file_texts = submission_file_texts or []
parts = [
"ЗАДАНИЕ:",
f"Название: {homework_title}",
f"Описание: {homework_description or 'Нет текста.'}",
]
for name, content in homework_file_texts:
parts.append(f"\n--- Файл задания «{name}» ---\n{content}")
parts.append("\nРЕШЕНИЕ СТУДЕНТА:")
parts.append(submission_content or "Нет текста.")
for name, content in submission_file_texts:
parts.append(f"\n--- Файл решения «{name}» ---\n{content}")
parts.append(
"\n\n--- ИНСТРУКЦИЯ ДЛЯ ОТВЕТА ---\n"
"Ответь строго в два поля.\n"
"1) ОЦЕНКА: [целое число от 1 до 5].\n"
"2) КОММЕНТАРИЙ: [развёрнутый комментарий для студента на русском]. "
"Пиши полноценный отзыв: что сделано хорошо, что улучшить, конкретные замечания по коду или решению. "
"Не используй краткие формулировки вроде «Совпало: да», «Верно: да» — только развёрнутый текст в поле КОММЕНТАРИЙ."
)
if dev_mode:
parts.append(
"\n\n[РЕЖИМ ОТЛАДКИ] В комментарии обязательно опиши, что изображено на каждом приложенном изображении (фото, скриншот, рисунок): что на них видно, текст если есть, структура. Начни с описания изображений, затем дай оценку и общий комментарий по заданию."
)
return "\n".join(parts)
def _parse_ai_response(self, response: str, max_score: int) -> Tuple[int, str]:
"""
Парсинг ответа AI.
Returns:
tuple: (score, feedback)
"""
try:
# Ищем оценку
score = None
feedback = ""
lines = response.split('\n')
in_comment = False
for line in lines:
line = line.strip()
# Ищем оценку
if line.startswith('ОЦЕНКА:') or 'ОЦЕНКА:' in line:
try:
# Извлекаем число
parts = line.split('ОЦЕНКА:')
if len(parts) > 1:
score_str = parts[1].strip().split()[0]
score = int(score_str)
except (ValueError, IndexError):
pass
# Ищем комментарий
if line.startswith('КОММЕНТАРИЙ:') or 'КОММЕНТАРИЙ:' in line:
in_comment = True
# Извлекаем текст после "КОММЕНТАРИЙ:"
parts = line.split('КОММЕНТАРИЙ:')
if len(parts) > 1:
feedback += parts[1].strip() + '\n'
continue
if in_comment:
feedback += line + '\n'
# Если не нашли оценку, пытаемся найти число в начале ответа
if score is None:
for line in lines[:5]: # Проверяем первые 5 строк
try:
# Ищем первое число
words = line.split()
for word in words:
if word.isdigit():
score = int(word)
if 0 <= score <= max_score:
break
if score is not None:
break
except:
pass
# Если все еще не нашли, используем среднее значение (3 по шкале 15)
if score is None:
score = 3
logger.warning("Не удалось извлечь оценку из ответа AI. Используется 3.")
# Ограничиваем оценку 15
score = max(1, min(score, min(max_score, 5)))
# Если не нашли комментарий, используем весь ответ
if not feedback.strip():
feedback = response
return score, feedback.strip()
except Exception as e:
logger.error(f"Ошибка парсинга ответа AI: {str(e)}")
# Возвращаем среднее значение и весь ответ как комментарий
return max_score // 2, response
# Глобальный экземпляр сервиса
_ai_service = None
def get_ai_service() -> AICheckingService:
"""Получить экземпляр сервиса AI проверки."""
global _ai_service
if _ai_service is None:
_ai_service = AICheckingService()
return _ai_service