Skip to main content
Проект: Интеллектуальная система мониторинга цен конкурентов
Модуль: Watcher / Celery
Версия: 2.1
Дата: Январь 2026

7.1 Обзор

Назначение

Celery обеспечивает выполнение фоновых и периодических задач модуля Watcher:
  • Генерация задач парсинга по расписанию
  • Проверка алертов и отправка уведомлений
  • Мониторинг состояния агентов
  • Очистка устаревших данных
  • Расчёт приоритетов мониторинга

Архитектура


7.2 Конфигурация Celery

Настройки

# app/core/celery_config.py

from celery import Celery
from celery.schedules import crontab
from kombu import Queue

from app.core.config import settings


# Создание приложения Celery
celery_app = Celery(
    "adolf_watcher",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
    include=[
        "app.tasks.watcher.generation",
        "app.tasks.watcher.alerts",
        "app.tasks.watcher.agents",
        "app.tasks.watcher.cleanup",
        "app.tasks.watcher.priorities",
        "app.tasks.watcher.notifications",
    ]
)

# Конфигурация
celery_app.conf.update(
    # Сериализация
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    
    # Таймзона
    timezone="Europe/Moscow",
    enable_utc=True,
    
    # Очереди
    task_queues=(
        Queue("default", routing_key="default"),
        Queue("critical", routing_key="critical"),
        Queue("heavy", routing_key="heavy"),
    ),
    task_default_queue="default",
    task_default_routing_key="default",
    
    # Роутинг задач
    task_routes={
        "app.tasks.watcher.agents.*": {"queue": "critical"},
        "app.tasks.watcher.notifications.*": {"queue": "critical"},
        "app.tasks.watcher.priorities.*": {"queue": "heavy"},
        "app.tasks.watcher.*": {"queue": "default"},
    },
    
    # Ретраи
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    
    # Лимиты
    worker_prefetch_multiplier=4,
    worker_max_tasks_per_child=1000,
    
    # Результаты
    result_expires=3600,  # 1 час
    
    # Мониторинг
    worker_send_task_events=True,
    task_send_sent_event=True,
)

# Расписание периодических задач
celery_app.conf.beat_schedule = {
    # === Генерация задач ===
    "watcher-generate-daily-tasks": {
        "task": "app.tasks.watcher.generation.generate_daily_tasks",
        "schedule": crontab(hour=20, minute=30),  # 20:30 MSK
        "options": {"queue": "default"}
    },
    
    # === Проверка алертов ===
    "watcher-check-alerts": {
        "task": "app.tasks.watcher.alerts.check_alerts",
        "schedule": crontab(minute="*/15"),  # Каждые 15 минут
        "options": {"queue": "default"}
    },
    
    # === Мониторинг агентов ===
    "watcher-sync-agent-status": {
        "task": "app.tasks.watcher.agents.sync_agent_status",
        "schedule": crontab(minute="*"),  # Каждую минуту
        "options": {"queue": "critical"}
    },
    
    # === Очистка данных ===
    "watcher-cleanup-old-tasks": {
        "task": "app.tasks.watcher.cleanup.cleanup_old_tasks",
        "schedule": crontab(hour=3, minute=0),  # 03:00 MSK
        "options": {"queue": "default"}
    },
    "watcher-cleanup-old-logs": {
        "task": "app.tasks.watcher.cleanup.cleanup_old_logs",
        "schedule": crontab(hour=3, minute=30),  # 03:30 MSK
        "options": {"queue": "default"}
    },
    
    # === Расчёт приоритетов (v2.0) ===
    "watcher-calculate-priorities": {
        "task": "app.tasks.watcher.priorities.calculate_priorities",
        "schedule": crontab(hour=4, minute=0),  # 04:00 MSK
        "options": {"queue": "heavy"}
    },
    
    # === Ежедневная статистика ===
    "watcher-daily-stats": {
        "task": "app.tasks.watcher.stats.generate_daily_stats",
        "schedule": crontab(hour=8, minute=0),  # 08:00 MSK
        "options": {"queue": "default"}
    },
    
    # === Проверка cookies ===
    "watcher-check-cookies-validity": {
        "task": "app.tasks.watcher.agents.check_cookies_validity",
        "schedule": crontab(hour=19, minute=30),  # 19:30 MSK (до копирования)
        "options": {"queue": "critical"}
    },
}

7.3 Задачи генерации

generate_daily_tasks

# app/tasks/watcher/generation.py

from celery import shared_task
from datetime import date
import logging

from app.core.database import get_db_session
from app.services.watcher.task_generator import TaskGenerator
from app.core.redis import get_redis


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.generation.generate_daily_tasks",
    bind=True,
    max_retries=3,
    default_retry_delay=300,  # 5 минут
    autoretry_for=(Exception,),
    acks_late=True
)
def generate_daily_tasks(self):
    """
    Генерация задач парсинга на ночь.
    
    Запускается ежедневно в 20:30.
    Создаёт задачи для всех активных подписок и их конкурентов.
    """
    logger.info("Starting daily task generation")
    
    try:
        with get_db_session() as db:
            redis = get_redis()
            generator = TaskGenerator(db=db, redis=redis)
            
            # Генерация задач
            stats = generator.generate_daily_tasks()
            
            total = sum(stats.values())
            
            logger.info(
                f"Task generation completed: {total} tasks created. "
                f"WB: {stats.get('wildberries', 0)}, "
                f"Ozon: {stats.get('ozon', 0)}, "
                f"YM: {stats.get('yandex_market', 0)}"
            )
            
            # Сохранение статистики
            redis.hset(
                f"watcher:generation:{date.today().isoformat()}",
                mapping={
                    "total": total,
                    "wildberries": stats.get("wildberries", 0),
                    "ozon": stats.get("ozon", 0),
                    "yandex_market": stats.get("yandex_market", 0),
                    "status": "completed"
                }
            )
            redis.expire(f"watcher:generation:{date.today().isoformat()}", 86400 * 7)
            
            return {
                "status": "success",
                "total_tasks": total,
                "by_marketplace": stats
            }
            
    except Exception as e:
        logger.error(f"Task generation failed: {e}")
        
        # Сохранение ошибки
        redis = get_redis()
        redis.hset(
            f"watcher:generation:{date.today().isoformat()}",
            mapping={
                "status": "failed",
                "error": str(e)
            }
        )
        
        raise


@shared_task(
    name="app.tasks.watcher.generation.regenerate_failed_tasks",
    bind=True
)
def regenerate_failed_tasks(self, scheduled_date: str = None):
    """
    Повторная генерация для провалившихся задач.
    
    Используется для ручного перезапуска.
    """
    if not scheduled_date:
        scheduled_date = date.today().isoformat()
    
    logger.info(f"Regenerating failed tasks for {scheduled_date}")
    
    with get_db_session() as db:
        # Находим провалившиеся задачи
        from sqlalchemy import select, and_
        from app.models.watcher import WatcherTask, TaskStatus
        
        query = select(WatcherTask).where(
            and_(
                WatcherTask.scheduled_date == scheduled_date,
                WatcherTask.status == TaskStatus.FAILED.value,
                WatcherTask.retry_count >= 3  # Исчерпаны попытки
            )
        )
        
        result = db.execute(query)
        failed_tasks = result.scalars().all()
        
        if not failed_tasks:
            return {"status": "no_failed_tasks"}
        
        # Сброс статуса и retry_count
        for task in failed_tasks:
            task.status = TaskStatus.PENDING.value
            task.retry_count = 0
            task.error = None
            task.agent_id = None
        
        db.commit()
        
        # Добавление обратно в очереди Redis
        redis = get_redis()
        for task in failed_tasks:
            queue_key = f"watcher:task_queue:{task.marketplace}"
            redis.rpush(queue_key, str(task.id))
        
        logger.info(f"Regenerated {len(failed_tasks)} failed tasks")
        
        return {
            "status": "success",
            "regenerated_count": len(failed_tasks)
        }

7.4 Задачи алертов

check_alerts

# app/tasks/watcher/alerts.py

from celery import shared_task
from datetime import datetime, timedelta
from typing import List, Dict
import logging

from app.core.database import get_db_session
from app.services.watcher.alert_engine import AlertEngine
from app.services.notifications import NotificationService


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.alerts.check_alerts",
    bind=True,
    max_retries=2,
    default_retry_delay=60
)
def check_alerts(self):
    """
    Проверка условий для генерации алертов.
    
    Запускается каждые 15 минут.
    Анализирует новые данные и создаёт алерты при превышении порогов.
    """
    logger.info("Starting alert check")
    
    with get_db_session() as db:
        alert_engine = AlertEngine(db=db)
        
        # Временное окно: последние 20 минут (с запасом)
        since = datetime.utcnow() - timedelta(minutes=20)
        
        # Проверка всех условий
        alerts_created = alert_engine.check_all_conditions(since=since)
        
        if alerts_created:
            logger.info(f"Created {len(alerts_created)} new alerts")
            
            # Отправка уведомлений
            for alert in alerts_created:
                send_alert_notification.delay(alert_id=str(alert.id))
        
        return {
            "status": "success",
            "alerts_created": len(alerts_created)
        }


@shared_task(
    name="app.tasks.watcher.alerts.send_alert_notification",
    bind=True,
    max_retries=3,
    default_retry_delay=30
)
def send_alert_notification(self, alert_id: str):
    """
    Отправка уведомления об алерте.
    
    Определяет получателей на основе severity и brand_id.
    """
    logger.info(f"Sending notification for alert {alert_id}")
    
    with get_db_session() as db:
        from sqlalchemy import select
        from app.models.watcher import WatcherAlert
        from app.models.users import User
        
        # Получение алерта
        query = select(WatcherAlert).where(WatcherAlert.id == alert_id)
        result = db.execute(query)
        alert = result.scalar_one_or_none()
        
        if not alert:
            logger.warning(f"Alert {alert_id} not found")
            return {"status": "alert_not_found"}
        
        # Определение получателей
        recipients = _get_alert_recipients(
            db=db,
            severity=alert.severity,
            brand_id=alert.brand_id,
            alert_type=alert.alert_type
        )
        
        if not recipients:
            logger.info(f"No recipients for alert {alert_id}")
            return {"status": "no_recipients"}
        
        # Отправка уведомлений
        notification_service = NotificationService()
        
        for user_id in recipients:
            notification_service.send(
                user_id=user_id,
                event_type=f"watcher_alert_{alert.alert_type}",
                data={
                    "alert_id": str(alert.id),
                    "sku": alert.sku,
                    "marketplace": alert.marketplace,
                    "alert_type": alert.alert_type,
                    "severity": alert.severity,
                    "details": alert.details
                }
            )
        
        logger.info(f"Sent notifications to {len(recipients)} users")
        
        return {
            "status": "success",
            "recipients_count": len(recipients)
        }


def _get_alert_recipients(
    db,
    severity: str,
    brand_id: str,
    alert_type: str
) -> List[str]:
    """Определение получателей алерта."""
    from sqlalchemy import select, or_
    from app.models.users import User
    
    recipients = []
    
    # Правила получателей по severity
    if severity == "critical":
        # Критичные: Manager по бренду + Senior + Director + Admin
        roles = ["manager", "senior", "director", "admin"]
    elif severity == "warning":
        # Предупреждения: Manager по бренду + Senior
        roles = ["manager", "senior"]
    else:
        # Информационные: только Manager по бренду
        roles = ["manager"]
    
    # Запрос пользователей
    query = select(User).where(
        User.role.in_(roles),
        User.is_active == True
    )
    
    result = db.execute(query)
    users = result.scalars().all()
    
    for user in users:
        # Фильтрация Manager по бренду
        if user.role == "manager":
            if user.brand_id == brand_id or user.brand_id == "all":
                recipients.append(str(user.id))
        else:
            # Senior, Director, Admin получают все
            recipients.append(str(user.id))
    
    return list(set(recipients))


@shared_task(
    name="app.tasks.watcher.alerts.process_dumping_alert",
    bind=True
)
def process_dumping_alert(self, alert_id: str):
    """
    Дополнительная обработка алерта о демпинге.
    
    Может включать:
    - Автоматическое создание задачи для Marketing
    - Уведомление в Slack/Telegram
    - Логирование в отдельную систему
    """
    logger.info(f"Processing dumping alert {alert_id}")
    
    with get_db_session() as db:
        from sqlalchemy import select
        from app.models.watcher import WatcherAlert
        
        query = select(WatcherAlert).where(WatcherAlert.id == alert_id)
        result = db.execute(query)
        alert = result.scalar_one_or_none()
        
        if not alert or alert.alert_type != "dumping_detected":
            return {"status": "not_dumping_alert"}
        
        details = alert.details
        
        # Создание задачи для Marketing модуля (если интегрирован)
        # marketing_task = create_bid_adjustment_task(
        #     sku=alert.sku,
        #     marketplace=alert.marketplace,
        #     competitor_price=details.get("new_price"),
        #     reason="dumping_response"
        # )
        
        logger.info(f"Dumping alert {alert_id} processed")
        
        return {"status": "success"}

7.5 Задачи мониторинга агентов

sync_agent_status

# app/tasks/watcher/agents.py

from celery import shared_task
from datetime import datetime, timedelta
import logging

from app.core.database import get_db_session
from app.core.redis import get_redis
from app.services.watcher.agent_manager import AgentManager


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.agents.sync_agent_status",
    bind=True,
    max_retries=1
)
def sync_agent_status(self):
    """
    Синхронизация статуса агентов.
    
    Запускается каждую минуту.
    Проверяет heartbeat и помечает offline агентов.
    """
    with get_db_session() as db:
        redis = get_redis()
        agent_manager = AgentManager(db=db, redis=redis)
        
        # Проверка offline агентов
        offline_agents = agent_manager.check_offline_agents()
        
        if offline_agents:
            logger.warning(f"Agents went offline: {offline_agents}")
            
            # Перераспределение задач offline агентов
            for agent_id in offline_agents:
                redistribute_agent_tasks.delay(agent_id=agent_id)
        
        # Обновление общей статистики
        stats = agent_manager.get_aggregate_stats()
        
        redis.hset(
            "watcher:agents:stats",
            mapping={
                "total": stats["total"],
                "online": stats["online"],
                "offline": stats["offline"],
                "working": stats["working"],
                "updated_at": datetime.utcnow().isoformat()
            }
        )
        redis.expire("watcher:agents:stats", 120)
        
        return {
            "status": "success",
            "offline_count": len(offline_agents),
            "stats": stats
        }


@shared_task(
    name="app.tasks.watcher.agents.redistribute_agent_tasks",
    bind=True
)
def redistribute_agent_tasks(self, agent_id: str):
    """
    Перераспределение задач агента при его отключении.
    
    Возвращает in_progress задачи обратно в очередь.
    """
    logger.info(f"Redistributing tasks from agent {agent_id}")
    
    with get_db_session() as db:
        from sqlalchemy import select, update
        from app.models.watcher import WatcherTask, TaskStatus
        
        # Находим задачи агента
        query = select(WatcherTask).where(
            WatcherTask.agent_id == agent_id,
            WatcherTask.status == TaskStatus.IN_PROGRESS.value
        )
        
        result = db.execute(query)
        tasks = result.scalars().all()
        
        if not tasks:
            return {"status": "no_tasks"}
        
        redis = get_redis()
        
        # Возврат задач в очереди
        for task in tasks:
            # Сброс статуса
            task.status = TaskStatus.PENDING.value
            task.agent_id = None
            task.started_at = None
            
            # Добавление в очередь
            queue_key = f"watcher:task_queue:{task.marketplace}"
            redis.lpush(queue_key, str(task.id))  # В начало очереди
        
        db.commit()
        
        logger.info(f"Redistributed {len(tasks)} tasks from agent {agent_id}")
        
        return {
            "status": "success",
            "redistributed_count": len(tasks)
        }


@shared_task(
    name="app.tasks.watcher.agents.check_cookies_validity",
    bind=True
)
def check_cookies_validity(self):
    """
    Проверка валидности cookies агентов.
    
    Запускается в 19:30, до копирования cookies.
    Уведомляет администратора о проблемах.
    """
    logger.info("Checking cookies validity")
    
    with get_db_session() as db:
        from sqlalchemy import select
        from app.models.watcher import WatcherAgent
        
        query = select(WatcherAgent).where(WatcherAgent.cookies_valid == False)
        result = db.execute(query)
        invalid_agents = result.scalars().all()
        
        if invalid_agents:
            agent_names = [a.name for a in invalid_agents]
            logger.warning(f"Agents with invalid cookies: {agent_names}")
            
            # Уведомление администратора
            send_alert_notification.delay(
                alert_id=None,
                custom_notification={
                    "type": "cookies_warning",
                    "message": f"Агенты с невалидными cookies: {', '.join(agent_names)}",
                    "agents": agent_names,
                    "severity": "warning"
                }
            )
        
        return {
            "status": "success",
            "invalid_count": len(invalid_agents)
        }


@shared_task(
    name="app.tasks.watcher.agents.handle_panic_mode",
    bind=True
)
def handle_panic_mode(self, agent_id: str, reason: str, details: dict):
    """
    Обработка Panic Mode агента.
    
    Вызывается при получении сигнала PANIC от агента.
    """
    logger.warning(f"Handling panic mode for agent {agent_id}: {reason}")
    
    with get_db_session() as db:
        redis = get_redis()
        
        from sqlalchemy import select, update
        from app.models.watcher import WatcherAgent, WatcherAgentLog
        
        # Обновление статуса агента
        query = (
            update(WatcherAgent)
            .where(WatcherAgent.id == agent_id)
            .values(status="panic")
        )
        db.execute(query)
        
        # Логирование события
        log = WatcherAgentLog(
            agent_id=agent_id,
            event_type="panic",
            severity="error",
            message=reason,
            details=details
        )
        db.add(log)
        db.commit()
        
        # Блокировка IP (если известен)
        agent_ip = details.get("ip")
        if agent_ip:
            redis.setex(
                f"watcher:ip_blocked:{agent_ip}",
                3600,  # 1 час
                reason
            )
        
        # Перераспределение задач
        redistribute_agent_tasks.delay(agent_id=agent_id)
        
        # Уведомление администратора
        from app.services.notifications import NotificationService
        notification_service = NotificationService()
        
        notification_service.send_to_admins(
            event_type="watcher_agent_panic",
            data={
                "agent_id": agent_id,
                "reason": reason,
                "details": details
            }
        )
        
        return {"status": "success"}

7.6 Задачи очистки данных

cleanup_old_tasks

# app/tasks/watcher/cleanup.py

from celery import shared_task
from datetime import datetime, timedelta, date
import logging

from app.core.database import get_db_session


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.cleanup.cleanup_old_tasks",
    bind=True,
    max_retries=2
)
def cleanup_old_tasks(self, days: int = 90):
    """
    Удаление старых задач.
    
    Запускается ежедневно в 03:00.
    Удаляет задачи старше 90 дней.
    """
    logger.info(f"Cleaning up tasks older than {days} days")
    
    with get_db_session() as db:
        from sqlalchemy import delete
        from app.models.watcher import WatcherTask
        
        cutoff_date = date.today() - timedelta(days=days)
        
        query = delete(WatcherTask).where(
            WatcherTask.scheduled_date < cutoff_date
        )
        
        result = db.execute(query)
        deleted_count = result.rowcount
        db.commit()
        
        logger.info(f"Deleted {deleted_count} old tasks")
        
        # VACUUM (через raw SQL)
        db.execute("VACUUM ANALYZE watcher_tasks")
        
        return {
            "status": "success",
            "deleted_count": deleted_count,
            "cutoff_date": cutoff_date.isoformat()
        }


@shared_task(
    name="app.tasks.watcher.cleanup.cleanup_old_logs",
    bind=True,
    max_retries=2
)
def cleanup_old_logs(self, days: int = 30):
    """
    Удаление старых логов агентов.
    
    Запускается ежедневно в 03:30.
    Удаляет логи старше 30 дней.
    """
    logger.info(f"Cleaning up agent logs older than {days} days")
    
    with get_db_session() as db:
        from sqlalchemy import delete
        from app.models.watcher import WatcherAgentLog
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        
        query = delete(WatcherAgentLog).where(
            WatcherAgentLog.created_at < cutoff_date
        )
        
        result = db.execute(query)
        deleted_count = result.rowcount
        db.commit()
        
        logger.info(f"Deleted {deleted_count} old agent logs")
        
        return {
            "status": "success",
            "deleted_count": deleted_count
        }


@shared_task(
    name="app.tasks.watcher.cleanup.cleanup_resolved_alerts",
    bind=True
)
def cleanup_resolved_alerts(self, days: int = 365):
    """
    Удаление разрешённых алертов старше года.
    
    Запускается еженедельно.
    """
    logger.info(f"Cleaning up resolved alerts older than {days} days")
    
    with get_db_session() as db:
        from sqlalchemy import delete, and_
        from app.models.watcher import WatcherAlert
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        
        query = delete(WatcherAlert).where(
            and_(
                WatcherAlert.is_resolved == True,
                WatcherAlert.created_at < cutoff_date
            )
        )
        
        result = db.execute(query)
        deleted_count = result.rowcount
        db.commit()
        
        logger.info(f"Deleted {deleted_count} old resolved alerts")
        
        return {
            "status": "success",
            "deleted_count": deleted_count
        }


@shared_task(
    name="app.tasks.watcher.cleanup.cleanup_redis_keys",
    bind=True
)
def cleanup_redis_keys(self):
    """
    Очистка устаревших ключей в Redis.
    
    Удаляет ключи с истёкшим TTL и orphan данные.
    """
    logger.info("Cleaning up Redis keys")
    
    redis = get_redis()
    deleted_count = 0
    
    # Очистка старых статистик
    pattern = "watcher:stats:*"
    keys = redis.keys(pattern)
    
    for key in keys:
        # Проверка TTL
        ttl = redis.ttl(key)
        if ttl == -1:  # Нет TTL
            # Проверка возраста по содержимому
            data = redis.hgetall(key)
            if data:
                # Удаляем если старше 7 дней
                redis.delete(key)
                deleted_count += 1
    
    # Очистка orphan agent states
    pattern = "watcher:agent:*:state"
    keys = redis.keys(pattern)
    
    with get_db_session() as db:
        from sqlalchemy import select
        from app.models.watcher import WatcherAgent
        
        query = select(WatcherAgent.id)
        result = db.execute(query)
        valid_agent_ids = {str(row[0]) for row in result}
    
    for key in keys:
        # Извлечение agent_id из ключа
        parts = key.split(":")
        if len(parts) >= 3:
            agent_id = parts[2]
            if agent_id not in valid_agent_ids:
                redis.delete(key)
                deleted_count += 1
    
    logger.info(f"Deleted {deleted_count} Redis keys")
    
    return {
        "status": "success",
        "deleted_count": deleted_count
    }

7.7 Задачи расчёта приоритетов (v2.0)

calculate_priorities

# app/tasks/watcher/priorities.py

from celery import shared_task
from datetime import datetime, timedelta
from typing import Dict
import logging

from app.core.database import get_db_session


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.priorities.calculate_priorities",
    bind=True,
    max_retries=2,
    soft_time_limit=1800,  # 30 минут
    time_limit=2100  # 35 минут
)
def calculate_priorities(self):
    """
    Расчёт приоритетов мониторинга для v2.0.
    
    Запускается ежедневно в 04:00.
    Анализирует активность конкурентов и корректирует приоритеты.
    
    Факторы приоритета:
    - Частота изменения цен
    - Величина скидок
    - Близость цены к нашей
    - История демпинга
    """
    logger.info("Starting priority calculation")
    
    with get_db_session() as db:
        from sqlalchemy import select, update, func
        from app.models.watcher import (
            WatcherCompetitor,
            WatcherPriceHistory,
            WatcherAlert
        )
        
        # Получение всех активных конкурентов
        query = select(WatcherCompetitor).where(
            WatcherCompetitor.is_active == True
        )
        result = db.execute(query)
        competitors = result.scalars().all()
        
        updated_count = 0
        
        for competitor in competitors:
            new_priority = _calculate_competitor_priority(
                db=db,
                competitor=competitor
            )
            
            if new_priority != competitor.priority:
                competitor.priority = new_priority
                updated_count += 1
        
        db.commit()
        
        logger.info(f"Updated priorities for {updated_count} competitors")
        
        return {
            "status": "success",
            "updated_count": updated_count,
            "total_competitors": len(competitors)
        }


def _calculate_competitor_priority(db, competitor) -> int:
    """
    Расчёт приоритета для конкурента.
    
    Returns:
        Приоритет от 0 (низкий) до 100 (высокий)
    """
    from sqlalchemy import select, func, and_
    from app.models.watcher import WatcherPriceHistory, WatcherAlert
    
    score = 50  # Базовый приоритет
    
    # Период анализа: последние 30 дней
    since = datetime.utcnow() - timedelta(days=30)
    
    # 1. Частота изменения цен (+/- 20)
    price_changes = _count_price_changes(db, competitor.id, since)
    if price_changes >= 10:
        score += 20
    elif price_changes >= 5:
        score += 10
    elif price_changes == 0:
        score -= 10
    
    # 2. Величина скидок (+/- 15)
    avg_discount = _get_avg_discount(db, competitor.id, since)
    if avg_discount and avg_discount >= 30:
        score += 15
    elif avg_discount and avg_discount >= 15:
        score += 5
    
    # 3. Близость к нашей цене (+/- 15)
    price_diff_pct = _get_price_difference(db, competitor)
    if price_diff_pct is not None:
        if abs(price_diff_pct) <= 5:
            score += 15  # Очень близко
        elif abs(price_diff_pct) <= 15:
            score += 5  # Близко
        elif abs(price_diff_pct) > 50:
            score -= 10  # Далеко
    
    # 4. История демпинга (+20)
    dumping_alerts = _count_dumping_alerts(db, competitor.id, since)
    if dumping_alerts >= 2:
        score += 20
    elif dumping_alerts >= 1:
        score += 10
    
    # Нормализация в диапазон 0-100
    return max(0, min(100, score))


def _count_price_changes(db, competitor_id: str, since: datetime) -> int:
    """Подсчёт изменений цены."""
    from sqlalchemy import select, func
    from app.models.watcher import WatcherPriceHistory
    
    # Получаем историю цен
    query = select(WatcherPriceHistory.current_price).where(
        WatcherPriceHistory.competitor_id == competitor_id,
        WatcherPriceHistory.parsed_at >= since,
        WatcherPriceHistory.current_price.isnot(None)
    ).order_by(WatcherPriceHistory.parsed_at)
    
    result = db.execute(query)
    prices = [row[0] for row in result]
    
    if len(prices) < 2:
        return 0
    
    # Подсчёт изменений
    changes = 0
    for i in range(1, len(prices)):
        if prices[i] != prices[i-1]:
            changes += 1
    
    return changes


def _get_avg_discount(db, competitor_id: str, since: datetime) -> float | None:
    """Средний процент скидки."""
    from sqlalchemy import select, func
    from app.models.watcher import WatcherPriceHistory
    
    query = select(func.avg(WatcherPriceHistory.discount_percent)).where(
        WatcherPriceHistory.competitor_id == competitor_id,
        WatcherPriceHistory.parsed_at >= since,
        WatcherPriceHistory.discount_percent.isnot(None)
    )
    
    result = db.execute(query)
    avg = result.scalar()
    
    return float(avg) if avg else None


def _get_price_difference(db, competitor) -> float | None:
    """Разница в цене с нашим товаром (%)."""
    from sqlalchemy import select
    from app.models.watcher import WatcherPriceHistory
    
    # Последняя цена конкурента
    query = select(WatcherPriceHistory.current_price).where(
        WatcherPriceHistory.competitor_id == competitor.id,
        WatcherPriceHistory.current_price.isnot(None)
    ).order_by(WatcherPriceHistory.parsed_at.desc()).limit(1)
    
    result = db.execute(query)
    comp_price = result.scalar()
    
    if not comp_price:
        return None
    
    # Последняя наша цена
    query = select(WatcherPriceHistory.current_price).where(
        WatcherPriceHistory.sku == competitor.our_sku,
        WatcherPriceHistory.marketplace == competitor.marketplace,
        WatcherPriceHistory.competitor_id.is_(None),
        WatcherPriceHistory.current_price.isnot(None)
    ).order_by(WatcherPriceHistory.parsed_at.desc()).limit(1)
    
    result = db.execute(query)
    our_price = result.scalar()
    
    if not our_price:
        return None
    
    return ((comp_price - our_price) / our_price) * 100


def _count_dumping_alerts(db, competitor_id: str, since: datetime) -> int:
    """Количество алертов о демпинге."""
    from sqlalchemy import select, func
    from app.models.watcher import WatcherAlert
    
    query = select(func.count()).where(
        WatcherAlert.competitor_id == competitor_id,
        WatcherAlert.alert_type == "dumping_detected",
        WatcherAlert.created_at >= since
    )
    
    result = db.execute(query)
    return result.scalar() or 0

7.8 Задачи статистики

generate_daily_stats

# app/tasks/watcher/stats.py

from celery import shared_task
from datetime import datetime, date, timedelta
import logging

from app.core.database import get_db_session
from app.core.redis import get_redis


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.stats.generate_daily_stats",
    bind=True
)
def generate_daily_stats(self, target_date: str = None):
    """
    Генерация ежедневной статистики.
    
    Запускается в 08:00 для статистики за предыдущий день.
    """
    if target_date:
        stats_date = date.fromisoformat(target_date)
    else:
        stats_date = date.today() - timedelta(days=1)
    
    logger.info(f"Generating daily stats for {stats_date}")
    
    with get_db_session() as db:
        from sqlalchemy import select, func, and_
        from app.models.watcher import (
            WatcherTask, WatcherPriceHistory, WatcherAlert, TaskStatus
        )
        
        stats = {}
        
        # Статистика задач
        task_query = select(
            WatcherTask.status,
            func.count().label("count"),
            func.avg(WatcherTask.execution_time_ms).label("avg_time")
        ).where(
            WatcherTask.scheduled_date == stats_date
        ).group_by(WatcherTask.status)
        
        result = db.execute(task_query)
        task_stats = {row.status: {"count": row.count, "avg_time": row.avg_time} for row in result}
        
        stats["tasks"] = {
            "total": sum(t["count"] for t in task_stats.values()),
            "completed": task_stats.get(TaskStatus.COMPLETED.value, {}).get("count", 0),
            "failed": task_stats.get(TaskStatus.FAILED.value, {}).get("count", 0),
            "avg_time_ms": task_stats.get(TaskStatus.COMPLETED.value, {}).get("avg_time")
        }
        
        # Статистика по маркетплейсам
        mp_query = select(
            WatcherTask.marketplace,
            func.count().label("count")
        ).where(
            WatcherTask.scheduled_date == stats_date,
            WatcherTask.status == TaskStatus.COMPLETED.value
        ).group_by(WatcherTask.marketplace)
        
        result = db.execute(mp_query)
        stats["by_marketplace"] = {row.marketplace: row.count for row in result}
        
        # Статистика алертов
        alert_query = select(
            WatcherAlert.alert_type,
            func.count().label("count")
        ).where(
            func.date(WatcherAlert.created_at) == stats_date
        ).group_by(WatcherAlert.alert_type)
        
        result = db.execute(alert_query)
        stats["alerts"] = {row.alert_type: row.count for row in result}
        stats["alerts_total"] = sum(stats["alerts"].values())
        
        # Сохранение в Redis
        redis = get_redis()
        redis.hset(
            f"watcher:daily_stats:{stats_date.isoformat()}",
            mapping={k: str(v) if not isinstance(v, dict) else str(v) for k, v in stats.items()}
        )
        redis.expire(f"watcher:daily_stats:{stats_date.isoformat()}", 86400 * 90)  # 90 дней
        
        # Сохранение в PostgreSQL (опционально)
        # _save_stats_to_db(db, stats_date, stats)
        
        logger.info(f"Daily stats generated: {stats}")
        
        return {
            "status": "success",
            "date": stats_date.isoformat(),
            "stats": stats
        }


@shared_task(
    name="app.tasks.watcher.stats.generate_weekly_report",
    bind=True
)
def generate_weekly_report(self):
    """
    Генерация еженедельного отчёта.
    
    Запускается по понедельникам в 09:00.
    """
    logger.info("Generating weekly report")
    
    redis = get_redis()
    
    # Сбор статистики за неделю
    today = date.today()
    week_stats = []
    
    for i in range(7):
        day = today - timedelta(days=i+1)
        key = f"watcher:daily_stats:{day.isoformat()}"
        stats = redis.hgetall(key)
        if stats:
            week_stats.append(stats)
    
    if not week_stats:
        return {"status": "no_data"}
    
    # Агрегация
    report = {
        "period": f"{today - timedelta(days=7)}{today - timedelta(days=1)}",
        "total_tasks": sum(int(s.get("tasks", "0")) for s in week_stats),
        "total_alerts": sum(int(s.get("alerts_total", "0")) for s in week_stats),
        # ... дополнительная агрегация
    }
    
    # Отправка отчёта (опционально)
    # send_weekly_report_email.delay(report)
    
    return {
        "status": "success",
        "report": report
    }

7.9 Мониторинг Celery

Flower конфигурация

# docker-compose.yml (фрагмент)

services:
  flower:
    image: mher/flower:0.9.7
    command: celery flower --broker=redis://redis:6379/0 --port=5555
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - FLOWER_BASIC_AUTH=admin:password
    depends_on:
      - redis

Health Check задача

# app/tasks/watcher/health.py

from celery import shared_task
import logging

from app.core.redis import get_redis
from app.core.database import get_db_session


logger = logging.getLogger(__name__)


@shared_task(
    name="app.tasks.watcher.health.health_check",
    bind=True
)
def health_check(self):
    """
    Проверка здоровья системы.
    
    Проверяет доступность всех компонентов.
    """
    checks = {}
    
    # Redis
    try:
        redis = get_redis()
        redis.ping()
        checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = f"error: {e}"
    
    # PostgreSQL
    try:
        with get_db_session() as db:
            db.execute("SELECT 1")
        checks["postgresql"] = "ok"
    except Exception as e:
        checks["postgresql"] = f"error: {e}"
    
    # Очереди
    try:
        redis = get_redis()
        for mp in ["wildberries", "ozon", "yandex_market"]:
            queue_len = redis.llen(f"watcher:task_queue:{mp}")
            checks[f"queue_{mp}"] = queue_len
    except Exception as e:
        checks["queues"] = f"error: {e}"
    
    all_ok = all(
        v == "ok" for k, v in checks.items() 
        if k in ["redis", "postgresql"]
    )
    
    return {
        "status": "healthy" if all_ok else "unhealthy",
        "checks": checks
    }

7.10 Запуск Workers

Docker Compose

# docker-compose.yml (фрагмент)

services:
  celery-worker-default:
    build: .
    command: celery -A app.core.celery_config worker -Q default -c 2 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://adolf:password@postgres:5432/adolf
    depends_on:
      - redis
      - postgres
    volumes:
      - ./app:/app/app
    restart: unless-stopped

  celery-worker-critical:
    build: .
    command: celery -A app.core.celery_config worker -Q critical -c 1 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://adolf:password@postgres:5432/adolf
    depends_on:
      - redis
      - postgres
    restart: unless-stopped

  celery-worker-heavy:
    build: .
    command: celery -A app.core.celery_config worker -Q heavy -c 1 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://adolf:password@postgres:5432/adolf
    depends_on:
      - redis
      - postgres
    restart: unless-stopped

  celery-beat:
    build: .
    command: celery -A app.core.celery_config beat --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://adolf:password@postgres:5432/adolf
    depends_on:
      - redis
      - postgres
    restart: unless-stopped

Systemd Service (альтернатива)

# /etc/systemd/system/watcher-celery-worker.service

[Unit]
Description=Watcher Celery Worker
After=network.target redis.service postgresql.service

[Service]
Type=forking
User=adolf
Group=adolf
WorkingDirectory=/opt/adolf
ExecStart=/opt/adolf/venv/bin/celery -A app.core.celery_config worker -Q default,critical -c 3 --loglevel=info --detach
ExecStop=/opt/adolf/venv/bin/celery -A app.core.celery_config control shutdown
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

7.11 Расписание задач (сводка)

ЗадачаРасписаниеОчередьОписание
generate_daily_tasks20:30defaultГенерация задач на ночь
check_alerts*/15 минdefaultПроверка алертов
sync_agent_status*/1 минcriticalМониторинг агентов
cleanup_old_tasks03:00defaultОчистка задач (90 дней)
cleanup_old_logs03:30defaultОчистка логов (30 дней)
calculate_priorities04:00heavyРасчёт приоритетов
generate_daily_stats08:00defaultЕжедневная статистика
check_cookies_validity19:30criticalПроверка cookies
office_heartbeat*/1 минdefaultСтатус в Office Dashboard

Приложение А: Контрольные точки Celery

КритерийПроверка
Workers запущеныcelery -A app inspect active
Beat запущенЛоги показывают scheduled tasks
Очереди работаютcelery -A app inspect reserved
Задачи выполняютсяFlower dashboard
Генерация работаетЗадачи появляются в 20:30
Алерты проверяютсяЛоги каждые 15 минут
Очистка работаетСтарые данные удаляются
Office статусАгенты отображаются в Office Dashboard

Приложение B: Интеграция с Office Dashboard

B.1 Агенты Watcher

agent_idnamesalary_equivalentfte_coefficient
watcher_price_monitorМониторинг цен600001.0
watcher_night_agentНочной агент600000.5
watcher_competitor_scanСканер конкурентов600001.0

B.2 Инициализация репортеров

# app/tasks/watcher/office.py

from app.utils.office_reporter import OfficeReporter

# Репортеры для агентов Watcher
OFFICE_REPORTERS = {
    "price_monitor": OfficeReporter(
        agent_id="watcher_price_monitor",
        department="watcher",
        name="Мониторинг цен",
        salary_equivalent=60000,
        fte_coefficient=1.0
    ),
    "night_agent": OfficeReporter(
        agent_id="watcher_night_agent",
        department="watcher",
        name="Ночной агент",
        salary_equivalent=60000,
        fte_coefficient=0.5
    ),
    "competitor_scan": OfficeReporter(
        agent_id="watcher_competitor_scan",
        department="watcher",
        name="Сканер конкурентов",
        salary_equivalent=60000,
        fte_coefficient=1.0
    )
}

B.3 Интеграция в задачу generate_daily_tasks

# app/tasks/watcher/generation.py

from .office import OFFICE_REPORTERS

@shared_task
def generate_daily_tasks():
    reporter = OFFICE_REPORTERS["price_monitor"]
    
    try:
        reporter.report_working("Генерация задач на ночь")
        
        # ... логика генерации ...
        tasks_created = create_parsing_tasks()
        
        reporter.report_idle(metrics={
            "products_monitored": get_monitored_count(),
            "tasks_generated": len(tasks_created),
            "price_changes_today": get_daily_changes()
        })
        
        return {"success": True, "tasks": len(tasks_created)}
        
    except Exception as e:
        reporter.report_error(str(e))
        raise

B.4 Интеграция с ночным агентом

# Ночной агент активируется в 00:00-06:00

@shared_task
def night_parsing_batch():
    reporter = OFFICE_REPORTERS["night_agent"]
    
    try:
        reporter.report_working("Ночное сканирование цен")
        
        # ... логика ...
        
        reporter.report_idle(metrics={
            "products_scanned": scanned_count,
            "price_changes_found": changes_count
        })
        
    except Exception as e:
        reporter.report_error(str(e))
        raise

B.5 Heartbeat задача

# app/tasks/watcher/office.py

from celery import shared_task

@shared_task(name='watcher.tasks.office_heartbeat')
def office_heartbeat():
    """Обновление статуса агентов в Office Dashboard."""
    for reporter in OFFICE_REPORTERS.values():
        reporter.heartbeat()
    return {"success": True, "agents": len(OFFICE_REPORTERS)}

B.6 Celery Beat Schedule

# Добавить в beat_schedule:

"watcher-office-heartbeat": {
    "task": "watcher.tasks.office_heartbeat",
    "schedule": 60.0,  # Каждую минуту
    "options": {"queue": "default"}
},

B.7 Метрики для Office

МетрикаОписаниеИсточник
products_monitoredТоваров на мониторингеБД: COUNT active products
price_changes_todayИзменений цен за деньБД: COUNT changes WHERE date=today
tasks_generatedЗадач сгенерированоRedis: tasks counter
queue_sizeЗадач в очередиRedis: pending tasks

Документ подготовлен: Январь 2026
Версия: 2.1
Статус: Черновик