# ТЗ: AI Sales Bot v3.0 (Python, standalone) **Версия:** 3.0 **Дата:** 2026-03-29 **Для:** Claude Code / Codex **Язык:** Python 3.11+ **Важно:** Бот работает автономно. НЕ использует OpenClaw, Paperclip или другие платформы. Все промпты, контекст и логика — внутри Python-кода. --- ## Изменения относительно v2.0 | # | Что изменено | Было (v2.0) | Стало (v3.0) | Причина | |---|---|---|---|---| | 1 | БД | SQLite | PostgreSQL + asyncpg + Alembic | SQLite блокируется при конкурентных webhook'ах → потеря лидов | | 2 | Обработка сообщений | Webhook → Router → LLM → Response (синхронно) | Webhook → Queue → Worker → Router (async pipeline) | LLM-вызов 3-15 сек, Авито webhook timeout < 5 сек → потеря сообщений | | 3 | Авито OAuth2 | Не описан | Token Manager с авторефрешем | Без токена API не работает | | 4 | Keyword-классификатор | Substring matching | pymorphy3 нормализация + substring | «стоимости», «построите» не матчились → лишние LLM-вызовы (+30-50% расход) | | 5 | Health check | Отсутствовал | `/health` endpoint + self-monitoring | Бот падает → никто не знает → лиды теряются | | 6 | LLM rate limit | Отсутствовал | Лимит вызовов/час на тенанта | Один тенант мог съесть весь бюджет LLM | | 7 | Scheduler | В основном процессе, блокирует event loop | Изолированный asyncio.Task с error boundary | Тяжёлые задачи scheduler блокировали ответы | | 8 | Structured logging | В requirements, но без паттерна | Полный паттерн с tenant_id, chat_id, intent | Без контекста в логах — невозможно дебажить | | 9 | Дедупликация | Заявлена, не реализована | Полная реализация по (chat_id, text_hash, window) | Авито шлёт дубли webhook'ов → двойные ответы | | 10 | Graceful shutdown | signal.signal → create_task (ненадёжно) | loop.add_signal_handler + await | Потеря сообщений при рестарте | | 11 | Миграции БД | Отсутствовали | Alembic | Изменение схемы → ручная работа → потеря данных | | 12 | Dedup + Antispam | После tenant lookup | Сразу после приёма (до тяжёлых операций) | Экономия ресурсов на спаме/дублях | | 13 | Структура проекта | Без queue/, migrations/ | Добавлены queue/, migrations/, avito/oauth.py | Новые модули | --- ## 0. ИНСТРУКЦИЯ ДЛЯ AI-РАЗРАБОТЧИКА ### 0.1 Перед началом - Прочитай ВСЁ ТЗ целиком перед написанием первой строки кода - Если что-то неясно — спроси, не додумывай - Каждый модуль должен работать автономно (можно тестировать отдельно) - Пиши тесты ВМЕСТЕ с кодом, не после - Каждая фаза заканчивается работающим тестом ### 0.2 Workflow разработки **ФАЗА 1/5: Ядро + CLI-тест** (можно тестировать в терминале) - Файлы: config.py, storage/database.py, storage/models.py, migrations/ (начальная), core/llm.py, core/llm_rate_limiter.py, core/privacy.py, core/antispam.py, core/dedup.py, core/dialog_matrix.py, core/keyword_classifier.py, agents/classifier.py, agents/responder.py, core/trust.py, core/eval.py, core/memory.py, core/router.py, core/logging_config.py - Тест: `python test_cli.py "Сколько стоит дом?"` → правильный ответ - Критерий: 20 тестовых сценариев проходят в CLI - **Требование:** PostgreSQL доступен локально, начальная миграция применена **ФАЗА 2/5: Авито webhook** (основной канал) - Файлы: adapters/base.py, adapters/avito_adapter.py, adapters/avito_oauth.py, queue/message_queue.py, queue/worker.py, tenants/manager.py, tenants/loader.py, main.py, health.py - Тест: сообщение в Авито → ответ в Авито - Критерий: бот отвечает на реальные сообщения Авито, `/health` возвращает 200 **ФАЗА 3/5: Квалификация + уведомления владельцу** - Файлы: agents/qualifier.py, agents/escalator.py, core/notifications.py, core/scheduler.py - Тест: HOT-лид → уведомление в Telegram владельцу - Критерий: лиды записываются в БД с правильной категорией **ФАЗА 4/5: Отчёты + команды владельца** - Файлы: reports/reporter.py, adapters/telegram_owner_adapter.py - Тест: /leads, /stats, /update_price работают - Критерий: еженедельный отчёт генерируется **ФАЗА 5/5: Доп. каналы + деплой** - Файлы: adapters/telegram_client_adapter.py, adapters/max_adapter.py, systemd - Тест: бот работает на 2+ каналах одновременно - Критерий: интеграционный тест ### 0.3 Контракты между модулями ``` Adapter → Queue: IncomingMessage(channel, tenant_id, chat_id, user_id, user_name, text, media_type, timestamp, raw, message_hash) → HTTP 200 (немедленно, до обработки) Queue → Worker → Router: IncomingMessage → OutgoingMessage(channel, chat_id, text, attachments) | None Router → KeywordClassifier (бесплатно, первый): (text: str) → KeywordResult(intent: str|None, confidence: float) Router → LLM Classifier (платно, если keyword не справился): (text: str, history: list[dict]) → ClassificationResult(intent, confidence, entities) Router → DialogMatrix (бесплатно): (intent: str) → str | None (готовый ответ или None) Router → Responder (платно, если матрица не справилась): (ClassificationResult, tenant: Tenant, history: list[dict], channel: str) → Response(text, facts_used) Router → Evaluator: (Response, tenant: Tenant) → EvalResult(passed: bool, checks: list) Router → Qualifier: (entities: dict, history: list[dict]) → QualificationResult(category: A|B|C|D, collected_data) Router → Escalator: (IncomingMessage, tenant, QualificationResult) → None (side effect: Telegram notification) Router → Notifications: (lead: dict, tenant: Tenant) → None (side effect: Telegram message to owner) LLMRateLimiter → LLMClient: check(tenant_id) → bool (True = можно вызывать, False = лимит превышен) ``` ### 0.4 Справочные материалы - **Авито Messenger API:** https://developers.avito.ru/api-catalog/messenger - **Авито Auth:** https://developers.avito.ru/api-catalog/auth - **MAX Bot API:** https://dev.max.ru/docs — endpoint: https://platform-api.max.ru/ - **DeepSeek API:** https://platform.deepseek.com/api-docs (OpenAI-совместимый) - **OpenRouter API:** https://openrouter.ai/docs (для Qwen fallback) - **aiogram 3.x:** https://docs.aiogram.dev/en/latest/ - **asyncpg:** https://magicstack.github.io/asyncpg/ - **Alembic:** https://alembic.sqlalchemy.org/ - **pymorphy3:** https://pymorphy2.readthedocs.io/en/stable/ (API совместим) --- ## 1. ОБЗОР ### 1.1 Что это Мультиканальный AI-бот для автоматизации продаж МСП. Отвечает клиентам за 30 секунд, квалифицирует лиды, передаёт горячих владельцу бизнеса. Работает автономно на VPS без внешних платформ. ### 1.2 Ключевые принципы - **Keyword-first:** 80% ответов без LLM (матрица + regex + pymorphy3). LLM только для нестандартных вопросов - **Мульти-тенант:** один инстанс обслуживает N клиентов - **Мультиканальный:** Авито (основной) + Telegram + MAX (через адаптеры) - **Два режима:** Авито = от первого лица владельца / Telegram = ассистент - **Privacy-first:** персданные НЕ уходят в LLM API, только локально - **Queue-first:** webhook отвечает мгновенно, обработка — в фоне - **Observable:** structured logging, health check, self-monitoring ### 1.3 Структура проекта ``` ai-sales-bot/ ├── main.py # Точка входа ├── config.py # Конфигурация (.env) ├── health.py # Health check endpoint ├── requirements.txt ├── .env.example ├── test_cli.py # CLI-тест (Фаза 1) │ ├── adapters/ # Канальные адаптеры │ ├── __init__.py │ ├── base.py # Базовый адаптер + IncomingMessage/OutgoingMessage │ ├── avito_adapter.py # Авито (webhook, FastAPI) │ ├── avito_oauth.py # Авито OAuth2 Token Manager │ ├── telegram_owner_adapter.py # Telegram для владельца (уведомления + команды) │ ├── telegram_client_adapter.py # Telegram для клиентов (aiogram) │ └── max_adapter.py # MAX │ ├── queue/ # Async-очередь сообщений │ ├── __init__.py │ ├── message_queue.py # asyncio.Queue + DB-backed persistence │ └── worker.py # Message Worker (достаёт из очереди, вызывает Router) │ ├── core/ # Ядро │ ├── __init__.py │ ├── router.py # Главный оркестратор │ ├── llm.py # DeepSeek + Qwen + шаблонный fallback │ ├── llm_rate_limiter.py # Rate limit LLM-вызовов по тенанту │ ├── keyword_classifier.py # Keyword-классификатор (pymorphy3 + substring) │ ├── dialog_matrix.py # Матрица готовых ответов (без LLM) │ ├── prompt_builder.py # Конструктор system prompt │ ├── memory.py # История диалогов из БД │ ├── eval.py # Проверка ответов │ ├── trust.py # Trust Matrix │ ├── privacy.py # Фильтрация персданных + regex entities │ ├── antispam.py # Антиспам + rate limiting │ ├── dedup.py # Дедупликация сообщений │ ├── notifications.py # Уведомления владельцу (Telegram) │ ├── scheduler.py # Follow-up + дайджесты (изолированный task) │ └── logging_config.py # Structured logging setup │ ├── agents/ # AI-агенты (используют LLM) │ ├── __init__.py │ ├── classifier.py # LLM-классификатор (fallback) │ ├── responder.py # Генерация ответов через LLM │ ├── qualifier.py # Квалификация лидов (логика в коде, не LLM) │ └── escalator.py # Эскалация + статус диалога │ ├── tenants/ # Мульти-тенант │ ├── __init__.py │ ├── manager.py # Управление клиентами │ ├── loader.py # Загрузка данных клиента │ └── onboarding.py # Автоонбординг через Telegram │ ├── storage/ # Хранение │ ├── __init__.py │ ├── database.py # PostgreSQL (asyncpg) │ └── models.py # Таблицы │ ├── migrations/ # Alembic миграции │ ├── alembic.ini │ ├── env.py │ └── versions/ │ └── 001_initial.py # Начальная миграция │ ├── reports/ # Отчёты │ ├── __init__.py │ └── reporter.py │ ├── data/ # Данные клиентов │ ├── _template/ # Шаблон нового клиента │ │ ├── config.yaml │ │ ├── company_profile.md │ │ ├── price_list.md │ │ ├── faq.md │ │ ├── portfolio.md │ │ ├── guarantees.md │ │ ├── objection_handlers.md │ │ ├── escalation_rules.md │ │ ├── dialog_matrix.yaml │ │ └── tone_of_voice.md │ └── {tenant_id}/ │ ├── tests/ │ ├── test_classifier.py │ ├── test_responder.py │ ├── test_qualifier.py │ ├── test_trust.py │ ├── test_matrix.py │ ├── test_dedup.py │ ├── test_queue.py │ ├── test_oauth.py │ └── test_scenarios.py # 20+ сценариев │ └── scripts/ ├── add_tenant.py ├── backup.sh └── init_db.py # Начальная настройка PostgreSQL ``` --- ## 2. ГЛАВНАЯ ЛОГИКА: ROUTER ### 2.1 Порядок обработки сообщения ```python async def process_message(message: IncomingMessage) -> OutgoingMessage | None: """ Вызывается из Worker (НЕ из webhook напрямую). Webhook только кладёт сообщение в очередь и возвращает HTTP 200. """ log = logger.bind( tenant_id=message.tenant_id, chat_id=message.chat_id, channel=message.channel, user_id=message.user_id, ) # 0. Дедупликация (по hash: chat_id + text + timestamp window) if await dedup.is_duplicate(message): log.debug("duplicate_skipped") return None # 1. Определить tenant tenant = tenant_manager.get_by_channel(message.channel, message.tenant_id) if not tenant: log.warning("tenant_not_found") return None # 2. Антиспам (до тяжёлых операций) if antispam.is_spam(message.text, message.user_id): log.info("spam_blocked") return None # 3. Проверить статус диалога conv = await storage.get_conversation(tenant.id, message.chat_id) if conv and conv.status == "escalated": log.info("escalated_forward") await notifications.forward_to_owner(message, tenant) return None # 4. Обработка медиа (фото, голосовые, документы) if message.media_type != "text": return handle_media(message, tenant) # 5. Проверить рабочие часы if not is_working_hours(tenant): text = (f"Добрый день! Ответим вам с {tenant.working_hours_start}:00. " f"Если срочно — звоните {tenant.phone}") log.info("outside_working_hours") return OutgoingMessage(channel=message.channel, chat_id=message.chat_id, text=text) # 6. Приветствие (первое сообщение в диалоге) greeting = "" if await is_first_message(tenant.id, message.chat_id): greeting = get_greeting_by_time(tenant.timezone) + " " # 7. Извлечь entities regex-ом (бесплатно, до LLM) entities = privacy.extract_entities(message.text) # 8. Сохранить контакты локально (если есть телефон/email) if entities.get("phone") or entities.get("email"): await storage.save_contact(message, entities, tenant.id) # 9. Очистить персданные из текста ПЕРЕД отправкой в LLM clean_text = privacy.strip_personal_data(message.text) # 10. Keyword-классификация (бесплатно, с нормализацией через pymorphy3) kw_result = keyword_classifier.classify(clean_text) log.info("keyword_result", intent=kw_result.intent, confidence=kw_result.confidence) # 11. Если keyword уверен → ответ из матрицы (0 токенов) if kw_result.intent and kw_result.confidence > 0.9: matrix_response = dialog_matrix.get_response(kw_result.intent, tenant.id) if matrix_response: response_text = greeting + matrix_response await storage.save_message(message, response_text, kw_result.intent, llm_used=False) await maybe_qualify(message, entities, tenant) log.info("matrix_hit", intent=kw_result.intent) return OutgoingMessage( channel=message.channel, chat_id=message.chat_id, text=truncate(response_text, message.channel) ) # 12. Keyword не справился → проверить LLM rate limit if not llm_rate_limiter.check(tenant.id): log.warning("llm_rate_limit_exceeded", tenant_id=tenant.id) # Fallback: дать шаблонный ответ, не вызывая LLM response_text = greeting + "Спасибо за обращение! Напишите подробнее — отвечу." await storage.save_message(message, response_text, "RATE_LIMITED", llm_used=False) return OutgoingMessage( channel=message.channel, chat_id=message.chat_id, text=response_text ) # 13. LLM-классификатор history = await memory.get_history_as_messages(tenant.id, message.chat_id, limit=10) classification = await classifier.classify(clean_text, history) log.info("llm_classified", intent=classification.intent, confidence=classification.confidence) # 14. Проверить матрицу ещё раз (по LLM-intent) matrix_response = dialog_matrix.get_response(classification.intent, tenant.id) if matrix_response and classification.confidence > 0.8: response_text = greeting + matrix_response await storage.save_message(message, response_text, classification.intent, llm_used=True) await maybe_qualify(message, entities, tenant) log.info("matrix_hit_after_llm", intent=classification.intent) return OutgoingMessage( channel=message.channel, chat_id=message.chat_id, text=truncate(response_text, message.channel) ) # 15. Матрица не справилась → LLM-ответ item_context = "" if message.channel == "avito" and message.raw.get("item_id"): item_context = await avito_adapter.get_item_text_cached( message.raw["item_id"], tenant ) response = await responder.respond( classification=classification, tenant=tenant, history=history, channel=message.channel, item_context=item_context, ) # 16. Eval pipeline eval_result = evaluator.check(response, tenant) if not eval_result.passed: response_text = greeting + "Уточню информацию и вернусь с ответом!" await escalator.escalate(message, tenant, reason="eval_failed", checks=eval_result.checks) await storage.save_message(message, response_text, classification.intent, eval_failed=True, llm_used=True) log.warning("eval_failed", checks=eval_result.checks) return OutgoingMessage( channel=message.channel, chat_id=message.chat_id, text=response_text ) # 17. Сохранить и ответить response_text = greeting + response.text await storage.save_message(message, response_text, classification.intent, llm_used=True) # 18. Квалификация await maybe_qualify(message, entities, tenant) log.info("response_sent", intent=classification.intent, response_time_ms=elapsed_ms()) return OutgoingMessage( channel=message.channel, chat_id=message.chat_id, text=truncate(response_text, message.channel) ) ``` ### 2.2 Лимиты длины по каналам ```python CHANNEL_LIMITS = { "avito": 1000, "telegram": 4096, "max": 4000, } def truncate(text: str, channel: str) -> str: limit = CHANNEL_LIMITS.get(channel, 1000) if len(text) > limit: return text[:limit - 3] + "..." return text ``` ### 2.3 Рабочие часы ```python from zoneinfo import ZoneInfo from datetime import datetime def is_working_hours(tenant) -> bool: now = datetime.now(ZoneInfo(tenant.timezone)) return tenant.working_hours_start <= now.hour < tenant.working_hours_end ``` > **Примечание:** используем `zoneinfo` (stdlib Python 3.9+) вместо `pytz`. Проще, нет ошибок с `localize()`. ### 2.4 Обработка медиа ```python def handle_media(message: IncomingMessage, tenant) -> OutgoingMessage | None: MEDIA_RESPONSES = { "photo": "Спасибо за фото! Подскажите, какой у вас вопрос?", "voice": "Напишите текстом — так удобнее для расчёта.", "document": "Получил документ. Оставьте номер — обсудим по телефону.", "sticker": None, "video": "Спасибо! Подскажите, какой у вас вопрос?", } text = MEDIA_RESPONSES.get(message.media_type) if text is None: return None return OutgoingMessage(channel=message.channel, chat_id=message.chat_id, text=text) ``` ### 2.5 Приветствие по времени ```python from zoneinfo import ZoneInfo from datetime import datetime def get_greeting_by_time(timezone: str) -> str: hour = datetime.now(ZoneInfo(timezone)).hour if 6 <= hour < 12: return "Доброе утро." elif 12 <= hour < 18: return "Добрый день." elif 18 <= hour < 23: return "Добрый вечер." else: return "Здравствуйте." ``` --- ## 3. QUEUE LAYER (НОВОЕ в v3.0) ### 3.1 Архитектура ``` Webhook (FastAPI) │ ├─→ Валидация payload ├─→ Создание IncomingMessage ├─→ Запись в message_queue таблицу (durable backup) ├─→ Положить в asyncio.Queue (in-memory, быстро) └─→ HTTP 200 (немедленно) Worker (отдельная asyncio.Task) │ ├─→ Достать из asyncio.Queue ├─→ Вызвать Router.process_message() ├─→ Отправить ответ через адаптер ├─→ Пометить в message_queue как обработано └─→ При ошибке: retry (макс 3 попытки), затем эскалация ``` ### 3.2 queue/message_queue.py ```python import asyncio import hashlib from dataclasses import dataclass from datetime import datetime @dataclass class QueuedMessage: message: "IncomingMessage" db_queue_id: int # ID записи в message_queue таблице attempt: int = 0 class MessageQueue: MAX_RETRIES = 3 def __init__(self, storage): self._queue: asyncio.Queue[QueuedMessage] = asyncio.Queue(maxsize=1000) self._storage = storage async def put(self, message: "IncomingMessage") -> None: """Вызывается из webhook. Сохраняет в БД и кладёт в in-memory очередь.""" db_id = await self._storage.enqueue_message( tenant_id=message.tenant_id, channel=message.channel, chat_id=message.chat_id, text=message.text, raw_payload=message.raw, ) await self._queue.put(QueuedMessage(message=message, db_queue_id=db_id)) async def get(self) -> QueuedMessage: """Вызывается из Worker. Блокирует до появления сообщения.""" return await self._queue.get() async def ack(self, queued: QueuedMessage) -> None: """Пометить сообщение как обработанное.""" await self._storage.dequeue_message(queued.db_queue_id) async def nack(self, queued: QueuedMessage, error: str) -> None: """Ошибка обработки. Retry или эскалация.""" queued.attempt += 1 if queued.attempt < self.MAX_RETRIES: await self._queue.put(queued) else: await self._storage.mark_queue_failed(queued.db_queue_id, error) async def recover_on_startup(self) -> int: """При старте: загрузить необработанные сообщения из БД в очередь.""" pending = await self._storage.get_pending_queue_messages() for row in pending: msg = IncomingMessage.from_db_row(row) await self._queue.put(QueuedMessage(message=msg, db_queue_id=row["id"], attempt=row["attempts"])) return len(pending) ``` ### 3.3 queue/worker.py ```python import asyncio import structlog logger = structlog.get_logger() class MessageWorker: def __init__(self, queue: MessageQueue, router, adapters: dict): self._queue = queue self._router = router self._adapters = adapters self._running = False async def run(self, num_workers: int = 3): """Запустить N параллельных воркеров.""" self._running = True workers = [ asyncio.create_task(self._worker_loop(i)) for i in range(num_workers) ] await asyncio.gather(*workers, return_exceptions=True) async def stop(self): self._running = False async def _worker_loop(self, worker_id: int): log = logger.bind(worker_id=worker_id) while self._running: try: queued = await asyncio.wait_for(self._queue.get(), timeout=1.0) except asyncio.TimeoutError: continue try: log.info("processing", chat_id=queued.message.chat_id, attempt=queued.attempt) result = await self._router.process_message(queued.message) if result: adapter = self._adapters.get(result.channel) if adapter: await adapter.send(result) await self._queue.ack(queued) except Exception as e: log.error("processing_failed", error=str(e), chat_id=queued.message.chat_id) await self._queue.nack(queued, str(e)) ``` --- ## 4. ДЕДУПЛИКАЦИЯ (НОВОЕ в v3.0) ### 4.1 core/dedup.py ```python import hashlib import time from collections import defaultdict class Deduplicator: """ Дедупликация по hash(chat_id + text) в скользящем окне. Авито гарантированно шлёт дубли webhook'ов — без этого модуля клиент получает двойные ответы. """ WINDOW_SECONDS = 10 # Окно дедупликации def __init__(self): self._seen: dict[str, float] = {} # hash → timestamp self._cleanup_counter = 0 async def is_duplicate(self, message) -> bool: msg_hash = self._hash(message.chat_id, message.text) now = time.monotonic() # Периодическая очистка (каждые 100 вызовов) self._cleanup_counter += 1 if self._cleanup_counter >= 100: self._cleanup(now) self._cleanup_counter = 0 if msg_hash in self._seen: if now - self._seen[msg_hash] < self.WINDOW_SECONDS: return True self._seen[msg_hash] = now return False def _hash(self, chat_id: str, text: str) -> str: return hashlib.sha256(f"{chat_id}:{text}".encode()).hexdigest()[:16] def _cleanup(self, now: float): expired = [k for k, v in self._seen.items() if now - v > self.WINDOW_SECONDS * 2] for k in expired: del self._seen[k] ``` --- ## 5. KEYWORD-КЛАССИФИКАТОР (с pymorphy3) ### 5.1 core/keyword_classifier.py ```python import pymorphy3 from dataclasses import dataclass morph = pymorphy3.MorphAnalyzer() @dataclass class KeywordResult: intent: str | None confidence: float # Ключевые слова в НОРМАЛЬНОЙ ФОРМЕ (лемма) KEYWORD_MAP = { "PRICE_GENERAL": ["стоить", "цена", "прайс", "расценка", "стоимость"], "PRICE_SPECIFIC": ["м²", "квадрат", "площадь"], "TIMELINE": ["срок", "долго", "построить время", "строить сколько"], "MATERIALS": ["материал", "газобетон", "кирпич", "блок"], "PORTFOLIO": ["работа показать", "пример", "фото дом", "портфолио"], "GUARANTEE": ["гарантия"], "LOCATION": ["где строить", "регион", "калининград", "область"], "MEETING": ["встреча", "замер", "приехать", "посмотреть"], "OBJECTION_PRICE": ["дорого", "дешёвый", "скидка"], "OBJECTION_TRUST": ["надёжный", "обман", "кидала", "отзыв"], "COMPLAINT": ["жалоба", "прокуратура", "суд", "обманщик"], "SPAM": ["продать", "оптом", "реклама", "заработок", "криптовалюта", "инвестиция"], "CONTACT": ["позвонить", "телефон", "номер", "whatsapp", "telegram", "связь"], "PROJECT": ["проект", "чертёж", "план дом"], "BOT_QUESTION": ["бот", "робот", "автоответчик", "живой человек"], } def _normalize(text: str) -> list[str]: """Привести каждое слово к нормальной форме (лемме).""" words = text.lower().split() lemmas = [] for word in words: # Убрать знаки пунктуации clean = word.strip(".,!?;:()\"'«»—-") if clean: parsed = morph.parse(clean) if parsed: lemmas.append(parsed[0].normal_form) return lemmas def classify(text: str) -> KeywordResult: """ Классификация по ключевым словам с нормализацией. В отличие от v2.0 (substring match), матчит морфологические формы: «стоимости» → «стоимость» ✓ «построите» → «построить» ✓ «гарантией» → «гарантия» ✓ """ lemmas = _normalize(text) lemma_text = " ".join(lemmas) for intent, keywords in KEYWORD_MAP.items(): for kw in keywords: # Если keyword — одно слово, ищем в леммах if " " not in kw: if kw in lemmas: return KeywordResult(intent=intent, confidence=0.95) else: # Многословный keyword: все слова должны присутствовать kw_parts = kw.split() if all(part in lemmas for part in kw_parts): return KeywordResult(intent=intent, confidence=0.90) # Fallback: прямой substring match (для м², цифр, спецсимволов) text_lower = text.lower() SUBSTRING_FALLBACK = { "PRICE_SPECIFIC": ["м²", "м2", "кв.м", "кв м"], "CONTACT": ["+7", "89", "@"], } for intent, substrings in SUBSTRING_FALLBACK.items(): for sub in substrings: if sub in text_lower: return KeywordResult(intent=intent, confidence=0.90) return KeywordResult(intent=None, confidence=0.0) ``` --- ## 6. МАТРИЦА ДИАЛОГОВ (без LLM) ### 6.1 core/dialog_matrix.py Загружает `data/{tenant_id}/dialog_matrix.yaml` при старте. ### 6.2 Формат dialog_matrix.yaml (пример для стройки) ```yaml # Матрица готовых ответов. Бот отвечает БЕЗ вызова LLM. # Формат: intent → текст ответа (1-2 предложения, от первого лица) PRICE_GENERAL: "Зависит от площади, материала и проекта. Какой площади дом планируете?" PRICE_SPECIFIC: "Считаю под конкретный проект. Расскажите подробнее — рассчитаю." TIMELINE: "Зависит от объёма. Расскажите что планируете — сориентирую по срокам." MATERIALS: "Работаю с газобетоном, кирпичом, блоками. Какой площади дом?" PORTFOLIO: "Могу показать примеры. Оставьте номер — отправлю фото готовых объектов." GUARANTEE: "Да, работаю по договору с гарантией. Детали обсудим по телефону." LOCATION: "Работаю по Калининграду и области. Где ваш участок?" MEETING: "Да, выезжаю на замер. Где находится участок?" OBJECTION_PRICE: "Цена зависит от объёма и материалов. Могу пересчитать с другими параметрами." OBJECTION_TRUST: "Работаю более 10 лет, всё по договору. Оставьте номер — покажу объекты." CONTACT: "Оставьте номер — перезвоню." PROJECT: "Помогу с этим. Оставьте номер — обсудим варианты." COMPLAINT: null # Эскалация, бот не отвечает SPAM: null # Не отвечать CONTACT_RECEIVED: "Принял. Перезвоню." UNKNOWN: "Я занимаюсь строительством. Чем могу помочь?" BOT_QUESTION: "Отвечаю быстро, чтобы вы не ждали. Чем могу помочь?" qualification_chain: - "Где находится участок?" - "Какой площади планируете дом?" - "Есть готовый проект?" - "Когда планируете начать?" ``` --- ## 7. PRIVACY (фильтрация персданных) ### 7.1 core/privacy.py ```python import re def extract_entities(text: str) -> dict: """Извлечь сущности regex-ом ДО отправки в LLM.""" entities = {} # Телефон (российские форматы) phone = re.search(r'[\+]?[78][\s\-\(]?\d{3}[\s\-\)]?\d{3}[\s\-]?\d{2}[\s\-]?\d{2}', text) if phone: entities["phone"] = phone.group() # Email email = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text) if email: entities["email"] = email.group() # Площадь area = re.search(r'(\d{2,3})\s*(?:м²|м2|кв\.?\s*м|квадрат)', text) if area: entities["area"] = int(area.group(1)) # Бюджет budget = re.search(r'(\d+(?:[.,]\d+)?)\s*(?:млн|миллион)', text) if budget: entities["budget"] = budget.group(1) return entities def strip_personal_data(text: str) -> str: """Заменить персданные на плейсхолдеры ПЕРЕД отправкой в LLM.""" cleaned = re.sub( r'[\+]?[78][\s\-\(]?\d{3}[\s\-\)]?\d{3}[\s\-]?\d{2}[\s\-]?\d{2}', '[ТЕЛЕФОН]', text ) cleaned = re.sub( r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]', cleaned ) return cleaned ``` --- ## 8. LLM (DeepSeek + Qwen + fallback) ### 8.1 core/llm.py ```python import httpx import structlog logger = structlog.get_logger() class LLMClient: FALLBACK_RESPONSE = "Спасибо за обращение! Напишите подробнее — отвечу." TIMEOUT = 15 # секунд async def complete( self, messages: list, temperature: float = 0.3, max_tokens: int = 300, tenant_id: str = "", ) -> str: log = logger.bind(tenant_id=tenant_id) # Уровень 1: DeepSeek try: result = await self._call_deepseek(messages, temperature, max_tokens) log.info("llm_call", provider="deepseek", status="ok") return result except Exception as e: log.warning("llm_call", provider="deepseek", status="failed", error=str(e)) # Уровень 2: Qwen через OpenRouter try: result = await self._call_openrouter(messages, temperature, max_tokens) log.info("llm_call", provider="openrouter", status="ok") return result except Exception as e: log.warning("llm_call", provider="openrouter", status="failed", error=str(e)) # Уровень 3: Шаблонный ответ log.critical("llm_all_failed") return self.FALLBACK_RESPONSE async def _call_deepseek(self, messages, temperature, max_tokens): async with httpx.AsyncClient(timeout=self.TIMEOUT) as client: resp = await client.post( "https://api.deepseek.com/v1/chat/completions", headers={"Authorization": f"Bearer {config.DEEPSEEK_API_KEY}"}, json={ "model": "deepseek-chat", "messages": messages, "temperature": temperature, "max_tokens": max_tokens, }, ) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"].strip() async def _call_openrouter(self, messages, temperature, max_tokens): async with httpx.AsyncClient(timeout=self.TIMEOUT) as client: resp = await client.post( "https://openrouter.ai/api/v1/chat/completions", headers={"Authorization": f"Bearer {config.OPENROUTER_API_KEY}"}, json={ "model": "qwen/qwen-2.5-72b-instruct", "messages": messages, "temperature": temperature, "max_tokens": max_tokens, }, ) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"].strip() ``` ### 8.2 core/llm_rate_limiter.py (НОВОЕ в v3.0) ```python import time from collections import defaultdict class LLMRateLimiter: """ Лимит LLM-вызовов по тенанту. Без лимита один тенант с большим трафиком съедает весь бюджет. """ DEFAULT_LIMIT_PER_HOUR = 200 # На тенанта def __init__(self, limit_per_hour: int = DEFAULT_LIMIT_PER_HOUR): self._limit = limit_per_hour self._calls: dict[str, list[float]] = defaultdict(list) def check(self, tenant_id: str) -> bool: """True = можно вызывать LLM. False = лимит исчерпан.""" now = time.monotonic() # Очистить старые записи self._calls[tenant_id] = [ t for t in self._calls[tenant_id] if now - t < 3600 ] if len(self._calls[tenant_id]) >= self._limit: return False self._calls[tenant_id].append(now) return True def get_usage(self, tenant_id: str) -> dict: """Текущее использование (для /stats).""" now = time.monotonic() active = [t for t in self._calls.get(tenant_id, []) if now - t < 3600] return {"used": len(active), "limit": self._limit, "remaining": self._limit - len(active)} ``` --- ## 9. АВИТО OAuth2 TOKEN MANAGER (НОВОЕ в v3.0) ### 9.1 adapters/avito_oauth.py ```python import httpx import asyncio import structlog from dataclasses import dataclass from datetime import datetime, timedelta logger = structlog.get_logger() @dataclass class AvitoToken: access_token: str expires_at: datetime @property def is_expired(self) -> bool: # Обновлять за 5 минут до истечения return datetime.utcnow() >= self.expires_at - timedelta(minutes=5) class AvitoOAuth: """ Авито OAuth2 Token Manager. - Токен живёт ~1 час - Автоматически обновляется за 5 мин до истечения - При 401 от API — принудительный рефреш + retry - Сохраняет токен в БД на случай рестарта """ TOKEN_URL = "https://api.avito.ru/token/" def __init__(self, client_id: str, client_secret: str, storage=None): self._client_id = client_id self._client_secret = client_secret self._storage = storage self._token: AvitoToken | None = None self._refresh_lock = asyncio.Lock() async def get_token(self) -> str: """Получить актуальный access_token. Рефрешит автоматически.""" if self._token and not self._token.is_expired: return self._token.access_token async with self._refresh_lock: # Double-check после получения лока if self._token and not self._token.is_expired: return self._token.access_token await self._refresh() return self._token.access_token async def force_refresh(self) -> str: """Принудительный рефреш (при 401).""" async with self._refresh_lock: await self._refresh() return self._token.access_token async def _refresh(self): logger.info("avito_oauth_refresh", client_id=self._client_id[:8] + "...") async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( self.TOKEN_URL, data={ "grant_type": "client_credentials", "client_id": self._client_id, "client_secret": self._client_secret, }, ) resp.raise_for_status() data = resp.json() self._token = AvitoToken( access_token=data["access_token"], expires_at=datetime.utcnow() + timedelta(seconds=data.get("expires_in", 3600)), ) # Сохранить в БД на случай рестарта if self._storage: await self._storage.save_avito_token( self._client_id, self._token.access_token, self._token.expires_at ) logger.info("avito_oauth_refreshed", expires_in=data.get("expires_in")) async def load_from_db(self): """При старте: попробовать загрузить токен из БД.""" if not self._storage: return saved = await self._storage.get_avito_token(self._client_id) if saved and saved["expires_at"] > datetime.utcnow() + timedelta(minutes=5): self._token = AvitoToken( access_token=saved["access_token"], expires_at=saved["expires_at"], ) logger.info("avito_oauth_loaded_from_db") ``` ### 9.2 Использование в avito_adapter.py ```python class AvitoAdapter: def __init__(self, oauth: AvitoOAuth, ...): self._oauth = oauth async def send(self, message: OutgoingMessage) -> None: token = await self._oauth.get_token() try: await self._send_message(token, message) except httpx.HTTPStatusError as e: if e.response.status_code == 401: # Токен протух — форсим рефреш и ретраим token = await self._oauth.force_refresh() await self._send_message(token, message) else: raise async def get_item_text_cached(self, item_id: str, tenant) -> str: """ Загрузить текст объявления с кешем (TTL 1 час). В v2.0 каждый запрос шёл в API → лишний latency. """ cache_key = f"item:{tenant.id}:{item_id}" cached = self._cache.get(cache_key) if cached: return cached token = await self._oauth.get_token() text = await self._fetch_item_text(token, item_id) self._cache.set(cache_key, text, ttl=3600) return text ``` --- ## 10. PROMPT BUILDER (конструктор промптов) ### 10.1 core/prompt_builder.py ```python AVITO_MODE = """ РЕЖИМ АВИТО — ты менеджер, говоришь от первого лица. ПРАВИЛА: 1. Говори «я», «делаю», «перезвоню» — НЕ «мы», «наш специалист» 2. Максимум 1-2 предложения 3. Цель — получить номер телефона 4. НЕ представляйся как бот 5. НЕ давай развёрнутых расчётов — «Оставьте номер — рассчитаю» 6. НЕ рекомендуй материалы — «Обсудим по телефону» """ TELEGRAM_MODE = """ РЕЖИМ TELEGRAM — ты ассистент владельца бизнеса. Можешь отвечать подробно, давать рекомендации, показывать таблицы. """ TRUST_RULES = """ ЗАПРЕЩЕНО: - Обещать скидки - Обсуждать юридику - Принимать оплату - Подтверждать факты которых НЕТ в данных компании - Называть точные цены без «точную стоимость рассчитаю после замера» - Называть сроки без «±2 недели в зависимости от условий» Если информации нет в данных — скажи «Уточню и вернусь с ответом» """ SAFETY_RULE = """ БЕЗОПАСНОСТЬ ДАННЫХ: Если клиент оставил телефон, email или адрес — НЕ повторяй их в ответе. Отвечай: «Принял. Перезвоню.» """ def build_system_prompt(tenant, intent: str, channel: str, item_context: str = "") -> str: parts = [] if channel == "avito": parts.append(AVITO_MODE) else: parts.append(TELEGRAM_MODE) parts.append(f"КОМПАНИЯ:\n{tenant.company_profile}") parts.append(f"ПРАВИЛА ЭСКАЛАЦИИ:\n{tenant.escalation_rules}") if item_context: parts.append(f"ОБЪЯВЛЕНИЕ (клиент пишет из этого объявления):\n{item_context}") if intent and intent.startswith("PRICE"): parts.append(f"ПРАЙС:\n{tenant.price_list}") elif intent == "PORTFOLIO": parts.append(f"ПОРТФОЛИО:\n{tenant.portfolio}") elif intent and intent.startswith("OBJECTION"): parts.append(f"ВОЗРАЖЕНИЯ:\n{tenant.objection_handlers}") elif intent == "GUARANTEE": parts.append(f"ГАРАНТИИ:\n{tenant.guarantees}") else: parts.append(f"FAQ:\n{tenant.faq}") parts.append(TRUST_RULES) parts.append(SAFETY_RULE) return "\n---\n".join(parts) ``` --- ## 11. LLM-КЛАССИФИКАТОР (fallback) ### 11.1 agents/classifier.py Вызывается ТОЛЬКО если keyword_classifier вернул confidence < 0.9. **Промпт:** ``` Определи intent сообщения клиента строительной компании. Выбери ОДИН из: PRICE_GENERAL, PRICE_SPECIFIC, TIMELINE, MATERIALS, PORTFOLIO, GUARANTEE, LOCATION, MEETING, OBJECTION_PRICE, OBJECTION_TRUST, OBJECTION_TIMELINE, FINANCING, PROCESS, DOCUMENTS, LAND, COMPARISON, SEASONAL, COMPLAINT, SPAM, CONTACT, PROJECT, BOT_QUESTION, UNKNOWN Ответ строго JSON: {"intent": "...", "confidence": 0.XX} ``` **Параметры:** temperature=0.1, max_tokens=100 --- ## 12. КВАЛИФИКАЦИЯ ЛИДОВ (в коде, не LLM) ### 12.1 agents/qualifier.py ```python from dataclasses import dataclass @dataclass class QualificationResult: category: str # A/B/C/D action: str score: int collected_data: dict def qualify_lead(entities: dict, history: list, intents: list) -> QualificationResult: score = 0 collected = {} if entities.get("phone"): score += 3 collected["phone"] = True if entities.get("area"): score += 2 collected["area"] = entities["area"] if entities.get("budget"): score += 2 collected["budget"] = entities["budget"] if "MEETING" in intents: score += 3 if "PROJECT" in intents: score += 1 if len(history) >= 6: score += 1 if score >= 5: return QualificationResult("A", "notify_owner_immediately", score, collected) elif score >= 3: return QualificationResult("B", "follow_up_24h", score, collected) elif score >= 1: return QualificationResult("C", "standard_response", score, collected) else: return QualificationResult("D", "close_politely", score, collected) ``` --- ## 13. ЭСКАЛАЦИЯ + СТАТУС ДИАЛОГА ### 13.1 agents/escalator.py **Статусы диалога:** ``` active → бот обрабатывает escalated → бот молчит, пересылает владельцу returned → владелец вернул боту closed → диалог завершён ``` **Триггеры эскалации:** - Intent = COMPLAINT - Intent = UNKNOWN (3 раза подряд) - Eval failed - Категория лида = A (HOT) - Ключевые слова: «суд», «жалоба», «прокуратура» **Формат уведомления:** ``` 🔥 Горячий лид! Канал: Авито Объявление: {item_title} Телефон: {phone} Площадь: {area} Бюджет: {budget} Последние сообщения: — Клиент: {msg1} — Бот: {reply1} — Клиент: {msg2} ``` **Таймер:** Если владелец не подтвердил контакт за 2 часа → повторное уведомление. Через 24ч → бот сам пишет клиенту: «Извините за задержку, менеджер свяжется с вами сегодня.» --- ## 14. МУЛЬТИ-ТЕНАНТ ### 14.1 Конфиг клиента (data/{tenant_id}/config.yaml) ```yaml tenant_id: "stroydream" company_name: "Александр — строительство" active: true channels: avito: enabled: true client_id: "AVITO_CLIENT_ID" client_secret: "AVITO_CLIENT_SECRET" user_id: 254744296 telegram_client: enabled: false bot_token: "" max: enabled: false owner: telegram_chat_id: "1036902910" name: "Александр" phone: "+79001234567" settings: timezone: "Europe/Kaliningrad" working_hours_start: 8 working_hours_end: 20 tone: "formal" max_response_length: 500 llm_rate_limit_per_hour: 200 # Лимит LLM-вызовов (НОВОЕ в v3.0) ``` ### 14.2 Маппинг tenant по каналам ```python # tenants/manager.py def get_by_channel(self, channel: str, identifier) -> Tenant | None: if channel == "avito": return self._avito_map.get(identifier) elif channel == "telegram": return self._telegram_map.get(identifier) elif channel == "max": return self._max_map.get(identifier) return None ``` ### 14.3 Авито webhook роутинг Один endpoint на всех: `POST /avito/webhook` Определяем tenant по `user_id` из payload. Webhook кладёт сообщение в очередь и немедленно возвращает HTTP 200. --- ## 15. ХРАНЕНИЕ (PostgreSQL) ### 15.1 storage/database.py ```python import asyncpg import structlog logger = structlog.get_logger() class Database: def __init__(self, dsn: str): self._dsn = dsn self._pool: asyncpg.Pool | None = None async def connect(self): self._pool = await asyncpg.create_pool( self._dsn, min_size=2, max_size=10, command_timeout=10, ) logger.info("db_connected", dsn=self._dsn.split("@")[-1]) # Логируем без пароля async def close(self): if self._pool: await self._pool.close() async def execute(self, query: str, *args): async with self._pool.acquire() as conn: return await conn.execute(query, *args) async def fetch(self, query: str, *args) -> list: async with self._pool.acquire() as conn: return await conn.fetch(query, *args) async def fetchrow(self, query: str, *args): async with self._pool.acquire() as conn: return await conn.fetchrow(query, *args) async def health_check(self) -> bool: """Для /health endpoint.""" try: await self.fetchrow("SELECT 1") return True except Exception: return False ``` ### 15.2 Таблицы (миграция 001_initial.py) ```sql -- Alembic migration: 001_initial CREATE TABLE conversations ( id SERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, channel TEXT NOT NULL, chat_id TEXT NOT NULL, user_id TEXT, user_name TEXT, item_id TEXT, status TEXT DEFAULT 'active', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE messages ( id SERIAL PRIMARY KEY, conversation_id INTEGER REFERENCES conversations(id), direction TEXT NOT NULL, -- IN / OUT text TEXT NOT NULL, intent TEXT, confidence REAL, response_time_ms INTEGER, eval_passed BOOLEAN DEFAULT TRUE, llm_used BOOLEAN DEFAULT FALSE, -- НОВОЕ: отслеживание LLM-вызовов timestamp TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE leads ( id SERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, conversation_id INTEGER REFERENCES conversations(id), category TEXT DEFAULT 'D', score INTEGER DEFAULT 0, -- НОВОЕ: числовой score квалификации name TEXT, phone BYTEA, -- Шифрованный (Fernet) email BYTEA, -- Шифрованный area TEXT, budget TEXT, item_id TEXT, status TEXT DEFAULT 'NEW', owner_notified_at TIMESTAMPTZ, owner_confirmed_at TIMESTAMPTZ, notes TEXT, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE metrics ( id SERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, date DATE NOT NULL, total_messages INTEGER DEFAULT 0, total_conversations INTEGER DEFAULT 0, leads_a INTEGER DEFAULT 0, leads_b INTEGER DEFAULT 0, leads_c INTEGER DEFAULT 0, leads_d INTEGER DEFAULT 0, escalations INTEGER DEFAULT 0, eval_failures INTEGER DEFAULT 0, avg_response_time_ms INTEGER DEFAULT 0, llm_calls INTEGER DEFAULT 0, matrix_hits INTEGER DEFAULT 0, UNIQUE(tenant_id, date) -- НОВОЕ: один ряд на тенанта/день ); -- Очередь сообщений (durable backup для asyncio.Queue) CREATE TABLE message_queue ( id SERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, channel TEXT NOT NULL, chat_id TEXT NOT NULL, text TEXT NOT NULL, raw_payload JSONB, -- НОВОЕ: полный payload для восстановления attempts INTEGER DEFAULT 0, status TEXT DEFAULT 'pending', -- pending / processing / done / failed error TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), processed_at TIMESTAMPTZ ); -- Авито токены (НОВОЕ в v3.0) CREATE TABLE avito_tokens ( client_id TEXT PRIMARY KEY, access_token TEXT NOT NULL, expires_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ DEFAULT NOW() ); -- Индексы CREATE INDEX idx_conversations_tenant_chat ON conversations(tenant_id, chat_id); CREATE INDEX idx_conversations_status ON conversations(tenant_id, status); CREATE INDEX idx_messages_conversation ON messages(conversation_id, timestamp); CREATE INDEX idx_leads_tenant ON leads(tenant_id, created_at); CREATE INDEX idx_leads_status ON leads(tenant_id, status); CREATE INDEX idx_queue_pending ON message_queue(status) WHERE status = 'pending'; CREATE INDEX idx_metrics_tenant_date ON metrics(tenant_id, date); ``` --- ## 16. HEALTH CHECK (НОВОЕ в v3.0) ### 16.1 health.py ```python from fastapi import APIRouter from datetime import datetime router = APIRouter() @router.get("/health") async def health_check(db, llm_client, queue, avito_oauth_map: dict): """ Health check endpoint. Проверяет: - БД (PostgreSQL) - Очередь (размер) - LLM API (последний успешный вызов) - Авито OAuth (токены валидны) """ checks = {} healthy = True # PostgreSQL try: db_ok = await db.health_check() checks["database"] = {"status": "ok" if db_ok else "error"} if not db_ok: healthy = False except Exception as e: checks["database"] = {"status": "error", "detail": str(e)} healthy = False # Очередь queue_size = queue._queue.qsize() checks["queue"] = { "status": "ok" if queue_size < 500 else "warning", "size": queue_size, } if queue_size >= 900: # Близко к maxsize=1000 healthy = False # Авито OAuth for tenant_id, oauth in avito_oauth_map.items(): token_ok = oauth._token and not oauth._token.is_expired checks[f"avito_oauth_{tenant_id}"] = { "status": "ok" if token_ok else "warning", } status_code = 200 if healthy else 503 return { "status": "healthy" if healthy else "degraded", "timestamp": datetime.utcnow().isoformat(), "checks": checks, } ``` ### 16.2 Self-monitoring (в scheduler) ```python async def self_health_check(self): """ Каждые 5 минут: проверить /health. Если degraded 3 раза подряд → Telegram-алерт владельцу/админу. """ try: async with httpx.AsyncClient(timeout=5) as client: resp = await client.get("http://localhost:8000/health") data = resp.json() if data["status"] != "healthy": self._health_failures += 1 if self._health_failures >= 3: await self._alert_admin(f"⚠️ Бот в degraded состоянии: {data['checks']}") else: self._health_failures = 0 except Exception as e: self._health_failures += 1 if self._health_failures >= 3: await self._alert_admin(f"🔴 Health check недоступен: {e}") ``` --- ## 17. SCHEDULER (изолированный) ### 17.1 core/scheduler.py ```python import asyncio import structlog logger = structlog.get_logger() class Scheduler: """ Периодические задачи. Работает в изолированном asyncio.Task с собственной error boundary — падение scheduler НЕ убивает основной процесс. """ def __init__(self, storage, notifications, queue): self._storage = storage self._notifications = notifications self._queue = queue self._running = False self._health_failures = 0 async def run(self): self._running = True while self._running: try: await self._tick() except Exception as e: logger.error("scheduler_tick_failed", error=str(e)) # Не роняем — логируем и продолжаем await asyncio.sleep(300) # Каждые 5 минут async def stop(self): self._running = False async def _tick(self): await self.check_follow_ups() await self.check_owner_reminders() await self.send_daily_digest() await self.retry_queued_messages() await self.cleanup_old_data() await self.self_health_check() async def check_follow_ups(self): """WARM-лиды без ответа 24ч → follow-up.""" leads = await self._storage.fetch( "SELECT * FROM leads WHERE category='B' " "AND created_at < NOW() - INTERVAL '24 hours' AND status='NEW'" ) for lead in leads: # Отправить follow-up через соответствующий адаптер pass async def check_owner_reminders(self): """HOT-лид, владелец не подтвердил за 2ч → повторное уведомление.""" leads = await self._storage.fetch( "SELECT * FROM leads WHERE category='A' " "AND owner_notified_at IS NOT NULL " "AND owner_confirmed_at IS NULL " "AND owner_notified_at < NOW() - INTERVAL '2 hours'" ) for lead in leads: await self._notifications.remind_owner(lead) async def send_daily_digest(self): """20:00 по timezone владельца → дайджест в Telegram.""" pass async def retry_queued_messages(self): """Повторная отправка из очереди (при предыдущих ошибках).""" failed = await self._storage.fetch( "SELECT * FROM message_queue WHERE status='failed' AND attempts < 3" ) for msg in failed: await self._queue.put(msg) async def cleanup_old_data(self): """Удалить персданные старше 90 дней (152-ФЗ).""" await self._storage.execute( "UPDATE leads SET phone=NULL, email=NULL " "WHERE created_at < NOW() - INTERVAL '90 days' AND phone IS NOT NULL" ) ``` --- ## 18. STRUCTURED LOGGING (НОВОЕ в v3.0) ### 18.1 core/logging_config.py ```python import structlog import logging import sys def setup_logging(log_level: str = "INFO", log_file: str = None): """ Structured logging с контекстом. Каждое сообщение лога содержит: tenant_id, chat_id, channel, intent, response_time_ms. Это критично для отладки в продакшене. """ processors = [ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, ] if sys.stderr.isatty(): # Dev: human-readable processors.append(structlog.dev.ConsoleRenderer()) else: # Prod: JSON (для grep, jq, лог-агрегаторов) processors.append(structlog.processors.JSONRenderer()) structlog.configure( processors=processors, wrapper_class=structlog.make_filtering_bound_logger( getattr(logging, log_level.upper()) ), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), ) # Файловый лог (в дополнение к stdout) if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(getattr(logging, log_level.upper())) logging.root.addHandler(file_handler) ``` ### 18.2 Пример использования ```python import structlog logger = structlog.get_logger() # В router.py — bind контекста один раз log = logger.bind(tenant_id="stroydream", chat_id="abc123", channel="avito") log.info("message_received", text_length=len(text)) log.info("keyword_result", intent="PRICE_GENERAL", confidence=0.95) log.info("matrix_hit", intent="PRICE_GENERAL") # tenant_id, chat_id уже в контексте # Вывод (JSON): # {"event":"matrix_hit","intent":"PRICE_GENERAL","tenant_id":"stroydream","chat_id":"abc123","channel":"avito","timestamp":"2026-03-29T12:34:56Z","level":"info"} ``` --- ## 19. КОМАНДЫ ВЛАДЕЛЬЦА (Telegram) ### 19.1 adapters/telegram_owner_adapter.py ``` /leads — список лидов за сегодня /leads week — за неделю /stats — статистика (диалоги, лиды, конверсия, LLM usage) /update_price — обновить прайс (прислать текст или файл) /update_faq — добавить вопрос в FAQ /pause — приостановить бота /resume — возобновить бота /return {chat_id} — вернуть диалог боту (после эскалации) /close {chat_id} — закрыть диалог /health — статус бота (НОВОЕ в v3.0) ``` --- ## 20. EVAL PIPELINE ### 20.1 core/eval.py ```python import re from dataclasses import dataclass @dataclass class EvalResult: passed: bool checks: list[tuple[str, bool]] class Evaluator: FORBIDDEN_PATTERNS = [ r"скидк[аиуе]", r"бесплатно", r"гарантирую", r"100%", r"точная (цена|стоимость)", r"(возврат|вернём) деньг", ] def check(self, response, tenant) -> EvalResult: checks = [] # 1. Запрещённые слова forbidden_found = any( re.search(p, response.text, re.I) for p in self.FORBIDDEN_PATTERNS ) checks.append(("forbidden", not forbidden_found)) # 2. Длина ответа checks.append(("length", len(response.text) <= tenant.max_response_length)) # 3. Первое лицо для Авито if response.channel == "avito": has_we = bool(re.search(r'\b(мы|наш|наша|наше|наши)\b', response.text, re.I)) checks.append(("first_person", not has_we)) # 4. Не повторяет персданные has_phone = bool(re.search(r'\d{10,11}', response.text)) checks.append(("no_personal", not has_phone)) passed = all(ok for _, ok in checks) return EvalResult(passed=passed, checks=checks) ``` --- ## 21. АНТИСПАМ ### 21.1 core/antispam.py ```python import time from collections import defaultdict MAX_PER_MINUTE = 5 MAX_PER_HOUR = 30 SPAM_KEYWORDS = [ "продам", "оптом", "реклама", "заработок", "криптовалют", "инвестиц", "казино", "ставки", "кредит без", ] class AntiSpam: def __init__(self): self._timestamps: dict[str, list[float]] = defaultdict(list) def is_spam(self, text: str, user_id: str) -> bool: # Rate limit now = time.monotonic() self._timestamps[user_id] = [ t for t in self._timestamps[user_id] if now - t < 3600 ] recent_1m = sum(1 for t in self._timestamps[user_id] if now - t < 60) if recent_1m >= MAX_PER_MINUTE: return True if len(self._timestamps[user_id]) >= MAX_PER_HOUR: return True self._timestamps[user_id].append(now) # Keyword check text_lower = text.lower() if any(kw in text_lower for kw in SPAM_KEYWORDS): return True return False ``` --- ## 22. КОНФИГУРАЦИЯ ### 22.1 .env ```env # LLM (НИКОГДА не в коде!) DEEPSEEK_API_KEY=sk-... OPENROUTER_API_KEY=sk-or-... # PostgreSQL DATABASE_URL=postgresql://bot:password@localhost:5432/salesbot # Шифрование персданных ENCRYPTION_KEY=... # Fernet key # Admin ADMIN_TELEGRAM_TOKEN=... ADMIN_TELEGRAM_CHAT_ID=... # Логи LOG_LEVEL=INFO LOG_FILE=logs/bot.log # Workers (НОВОЕ в v3.0) QUEUE_WORKERS=3 QUEUE_MAX_SIZE=1000 ``` ### 22.2 requirements.txt ``` aiogram>=3.4 fastapi>=0.110 uvicorn>=0.29 httpx>=0.27 pydantic>=2.0 pyyaml>=6.0 python-dotenv>=1.0 structlog>=24.0 cryptography>=42.0 pymorphy3>=2.0 asyncpg>=0.29 alembic>=1.13 ``` > **Изменения vs v2.0:** убран `pytz` (используем stdlib `zoneinfo`), убран `aiosqlite`, добавлены `asyncpg`, `alembic`, `pymorphy3`. --- ## 23. ТОЧКА ВХОДА ### 23.1 main.py ```python import asyncio import signal import structlog from fastapi import FastAPI from contextlib import asynccontextmanager from config import config from core.logging_config import setup_logging from storage.database import Database from queue.message_queue import MessageQueue from queue.worker import MessageWorker from core.router import Router from core.scheduler import Scheduler from health import router as health_router logger = structlog.get_logger() @asynccontextmanager async def lifespan(app: FastAPI): """Startup и shutdown в одном месте.""" setup_logging(config.LOG_LEVEL, config.LOG_FILE) # Startup db = Database(config.DATABASE_URL) await db.connect() queue = MessageQueue(db) recovered = await queue.recover_on_startup() if recovered: logger.info("queue_recovered", count=recovered) router = Router(db=db, ...) worker = MessageWorker(queue=queue, router=router, adapters={...}) scheduler = Scheduler(storage=db, ...) # Запустить воркеры и scheduler как фоновые задачи worker_task = asyncio.create_task(worker.run(num_workers=config.QUEUE_WORKERS)) scheduler_task = asyncio.create_task(scheduler.run()) logger.info("bot_started", workers=config.QUEUE_WORKERS) yield # Приложение работает # Shutdown logger.info("shutting_down") await worker.stop() await scheduler.stop() # Дождаться завершения текущих задач (макс 10 сек) worker_task.cancel() scheduler_task.cancel() try: await asyncio.wait_for( asyncio.gather(worker_task, scheduler_task, return_exceptions=True), timeout=10, ) except asyncio.TimeoutError: logger.warning("shutdown_timeout") await db.close() logger.info("shutdown_complete") app = FastAPI(lifespan=lifespan) app.include_router(health_router) # Авито webhook @app.post("/avito/webhook") async def avito_webhook(payload: dict): """Принять webhook, положить в очередь, вернуть 200 НЕМЕДЛЕННО.""" message = avito_adapter.parse_webhook(payload) if message: await queue.put(message) return {"ok": True} ``` --- ## 24. ТЕСТЫ (20+ сценариев) ```python SCENARIOS = [ # === Keyword + pymorphy3 → матрица (0 токенов) === {"input": "Сколько стоит дом?", "expect_intent": "PRICE_GENERAL", "expect_llm": False, "expect_contains": "площад"}, {"input": "Какова стоимость строительства?", "expect_intent": "PRICE_GENERAL", "expect_llm": False, "comment": "pymorphy3: стоимость → стоимость"}, {"input": "Покажите ваши работы", "expect_intent": "PORTFOLIO", "expect_llm": False}, {"input": "Дорого у вас", "expect_intent": "OBJECTION_PRICE", "expect_llm": False}, {"input": "Какие гарантии даёте?", "expect_intent": "GUARANTEE", "expect_llm": False, "comment": "pymorphy3: гарантии → гарантия"}, # === Regex entities === {"input": "Позвоните мне 89001234567", "expect_entity": "phone", "expect_response_contains": "Принял"}, {"input": "Хочу дом 120 м²", "expect_entity_area": 120}, # === Trust Matrix === {"input": "Дайте скидку 20%", "expect_not_contains": "скидк"}, {"input": "Вы строите за 2 недели, верно?", "expect_not_contains": "да", "comment": "Не подтверждать непроверенные факты"}, # === Privacy === {"input": "Мой номер +79001234567", "expect_llm_input_not_contains": "+7900", "comment": "Телефон не уходит в LLM"}, # === Квалификация === {"history": ["Хочу дом 150м²", "Бюджет 8 млн", "Позвоните: 89001234567"], "expect_category": "A"}, {"history": ["Сколько стоит?"], "expect_category": "D"}, # === Медиа === {"input_media": "photo", "expect_contains": "фото"}, {"input_media": "voice", "expect_contains": "текстом"}, # === Спам === {"input": "Продам кирпич оптом дёшево", "expect_no_response": True}, # === "Ты бот?" === {"input": "Это бот отвечает?", "expect_not_contains": "бот", "expect_contains": "помочь"}, # === Рабочие часы === {"input": "Привет", "time": "03:00", "expect_contains": "Ответим"}, # === Eval === {"input": "Какая точная цена?", "expect_not_contains": "точная цена"}, # === Follow-up (async) === {"scenario": "warm_no_response_24h", "expect_follow_up": True}, # === Эскалация === {"input": "Позвоню в прокуратуру!", "expect_escalation": True}, # === Дедупликация (НОВОЕ) === {"input": "Привет", "repeat_within_seconds": 5, "expect_single_response": True, "comment": "Повторный webhook от Авито → один ответ"}, # === LLM Rate Limit (НОВОЕ) === {"scenario": "exhaust_llm_limit", "expect_fallback_response": True, "comment": "После 200 LLM-вызовов → шаблонный ответ, не ошибка"}, # === Queue recovery (НОВОЕ) === {"scenario": "restart_with_pending_queue", "expect_messages_processed": True, "comment": "После рестарта — необработанные из БД попадают в очередь"}, ] ``` --- ## 25. ДЕПЛОЙ ### 25.1 Подготовка сервера ```bash # PostgreSQL sudo apt install postgresql postgresql-contrib sudo -u postgres createuser bot sudo -u postgres createdb salesbot -O bot sudo -u postgres psql -c "ALTER USER bot PASSWORD 'your_password';" # Python sudo apt install python3.11 python3.11-venv # Проект cd /opt/ai-sales-bot python3.11 -m venv venv source venv/bin/activate pip install -r requirements.txt # Миграции alembic upgrade head ``` ### 25.2 systemd ```ini [Unit] Description=AI Sales Bot After=network.target postgresql.service Requires=postgresql.service [Service] Type=simple User=bot WorkingDirectory=/opt/ai-sales-bot ExecStart=/opt/ai-sales-bot/venv/bin/python -m uvicorn main:app --host 0.0.0.0 --port 8000 Restart=always RestartSec=10 EnvironmentFile=/opt/ai-sales-bot/.env # Graceful shutdown KillSignal=SIGTERM TimeoutStopSec=15 [Install] WantedBy=multi-user.target ``` ### 25.3 Бэкап (cron) ```bash # Ежедневно 03:00 0 3 * * * /opt/ai-sales-bot/scripts/backup.sh # backup.sh #!/bin/bash DATE=$(date +%Y-%m-%d) PGPASSWORD="your_password" pg_dump -U bot -h localhost salesbot \ > /opt/ai-sales-bot/backups/salesbot_${DATE}.sql gzip /opt/ai-sales-bot/backups/salesbot_${DATE}.sql # Хранить 30 дней find /opt/ai-sales-bot/backups/ -name "*.sql.gz" -mtime +30 -delete ``` --- *ТЗ v3.0 — 2026-03-29* *Архитектурное ревью и доработки: Software Architect Agent v2.1*