Skip to main content
Модуль: Logistic
Компонент: Celery Background Tasks
Версия: 2.0
Дата: Февраль 2026
Заменяет: adolf_logistic_8_celery_v1_0.md

8.1 Назначение

Celery обеспечивает выполнение фоновых задач модуля Logistic:
  • Периодическая синхронизация остатков FBO с Ozon Seller API
  • Файловый импорт остатков из 1С (XLSX/XML)
  • Ежедневная генерация наряд-заданий на отгрузку
  • Автоотмена просроченных заданий
  • Генерация алертов и очистка устаревших данных

Изменения v1.0 → v2.0

v1.0 (Wildberries)v2.0 (Ozon + 1С)
sync_stocks (WB API)sync_ozon_stocks (Ozon FBO clusters)
sync_orders (WB API)Убрана (данные из Ozon Analytics)
sync_tariffs (WB API)Убрана (нерелевантна для FBO)
analyze_cross_dockingУбрана (проактивная модель)
generate_forecastsgenerate_supply_tasks (наряд-задания)
cleanup_old_datacleanup_old_data (обновлённые таблицы)
sync_ozon_analytics (продажи Ozon)
sync_ozon_warehouses (кластеры)
import_1c_stocks (файловый импорт)
auto_cancel_expired (автоотмена заданий)
generate_alerts (алерты по остаткам)
cleanup_import_archive (очистка архива 1С)

8.2 Архитектура


8.3 Конфигурация

celery_config.py

from celery import Celery
from celery.schedules import crontab

app = Celery(
    "logistic",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    # Сериализация
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    
    # Таймауты
    task_time_limit=300,       # 5 минут макс
    task_soft_time_limit=240,
    
    # Retry
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    
    # Очереди
    task_default_queue="logistic",
    task_queues={
        "logistic.sync": {
            "exchange": "logistic",
            "routing_key": "logistic.sync",
        },
        "logistic.import": {
            "exchange": "logistic",
            "routing_key": "logistic.import",
        },
        "logistic.logic": {
            "exchange": "logistic",
            "routing_key": "logistic.logic",
        },
    },
    
    # Маршрутизация
    task_routes={
        "logistic.tasks.sync_ozon_*": {"queue": "logistic.sync"},
        "logistic.tasks.import_*": {"queue": "logistic.import"},
        "logistic.tasks.cleanup_import_*": {"queue": "logistic.import"},
        "logistic.tasks.generate_*": {"queue": "logistic.logic"},
        "logistic.tasks.auto_cancel_*": {"queue": "logistic.logic"},
        "logistic.tasks.cleanup_old_*": {"queue": "logistic.logic"},
    },
    
    # Расписание
    beat_schedule={
        # === Синхронизация Ozon ===
        "sync-ozon-stocks": {
            "task": "logistic.tasks.sync_ozon_stocks",
            "schedule": crontab(minute="*/30"),
        },
        "sync-ozon-analytics": {
            "task": "logistic.tasks.sync_ozon_analytics",
            "schedule": crontab(hour=5, minute=0),
        },
        "sync-ozon-warehouses": {
            "task": "logistic.tasks.sync_ozon_warehouses",
            "schedule": crontab(hour=2, minute=0, day_of_week=1),
        },
        
        # === Импорт 1С ===
        "import-1c-stocks-morning": {
            "task": "logistic.tasks.import_1c_stocks",
            "schedule": crontab(hour=8, minute=0),
        },
        "import-1c-stocks-afternoon": {
            "task": "logistic.tasks.import_1c_stocks",
            "schedule": crontab(hour=14, minute=0),
        },
        "cleanup-import-archive": {
            "task": "logistic.tasks.cleanup_import_archive",
            "schedule": crontab(hour=4, minute=0),
        },
        
        # === Бизнес-логика ===
        "generate-supply-tasks": {
            "task": "logistic.tasks.generate_supply_tasks",
            "schedule": crontab(hour=7, minute=0),
        },
        "auto-cancel-expired": {
            "task": "logistic.tasks.auto_cancel_expired",
            "schedule": crontab(minute=0, hour="*/6"),
        },
        "generate-alerts": {
            "task": "logistic.tasks.generate_alerts",
            "schedule": crontab(minute="*/30"),
        },
        
        # === Обслуживание ===
        "cleanup-old-data": {
            "task": "logistic.tasks.cleanup_old_data",
            "schedule": crontab(hour=4, minute=0, day_of_week=0),
        },
    },
)

8.4 Задачи

8.4.1 sync_ozon_stocks

# tasks/sync_ozon_stocks.py
import structlog
from celery import shared_task

logger = structlog.get_logger("logistic.tasks")


@shared_task(
    name="logistic.tasks.sync_ozon_stocks",
    bind=True,
    max_retries=3,
    default_retry_delay=120,
    autoretry_for=(OzonAPIError,),
)
def sync_ozon_stocks(self, brand_id: str = "all"):
    """
    Синхронизация остатков FBO с Ozon Seller API.
    
    Расписание: каждые 30 минут
    Источник: POST /v1/analytics/stock_on_warehouses
    Результат: cluster_stock_snapshots + Redis cache
    """
    import asyncio
    
    async def _sync():
        stock_service = get_stock_service()
        brands = resolve_brands(brand_id)
        results = {}
        
        for brand in brands:
            adapter = get_ozon_adapter(brand)
            
            # Получаем остатки FBO по кластерам
            raw_stocks = await adapter.get_stock_on_warehouses()
            
            # Обогащаем velocity из analytics
            enriched = await stock_service.enrich_with_velocity(
                raw_stocks, brand
            )
            
            # Сохраняем снимки в БД
            count = await stock_service.save_cluster_snapshots(
                enriched, brand
            )
            
            # Обновляем Redis cache
            await stock_service.update_cache(enriched, brand)
            
            results[brand] = {
                "snapshots_saved": count,
                "clusters": len(set(s.cluster_name for s in enriched)),
                "articles": len(set(s.article for s in enriched)),
            }
        
        return results
    
    result = asyncio.run(_sync())
    
    logger.info(
        "sync_ozon_stocks_completed",
        results=result
    )
    
    return {"status": "success", "results": result}

8.4.2 sync_ozon_analytics

# tasks/sync_ozon_analytics.py
@shared_task(
    name="logistic.tasks.sync_ozon_analytics",
    bind=True,
    max_retries=2,
    default_retry_delay=600,
)
def sync_ozon_analytics(self, brand_id: str = "all"):
    """
    Синхронизация аналитики продаж с Ozon.
    
    Расписание: ежедневно 05:00
    Источник: POST /v1/analytics/data (metrics: ordered_units)
    Результат: обновление avg_daily_sales в снимках, 
               данные для DemandForecaster
    """
    import asyncio
    
    async def _sync():
        analytics_service = get_analytics_service()
        brands = resolve_brands(brand_id)
        results = {}
        
        for brand in brands:
            adapter = get_ozon_adapter(brand)
            
            # Продажи за 28 дней по SKU
            sales_data = await adapter.get_analytics_data(
                metrics=["ordered_units"],
                dimensions=["sku"],
                date_from=date.today() - timedelta(days=28),
                date_to=date.today() - timedelta(days=1),
            )
            
            count = await analytics_service.save_sales_analytics(
                sales_data, brand
            )
            
            results[brand] = {"records_saved": count}
        
        return results
    
    result = asyncio.run(_sync())
    
    logger.info("sync_ozon_analytics_completed", results=result)
    return {"status": "success", "results": result}

8.4.3 sync_ozon_warehouses

# tasks/sync_ozon_warehouses.py
@shared_task(
    name="logistic.tasks.sync_ozon_warehouses",
    bind=True,
    max_retries=2,
)
def sync_ozon_warehouses(self):
    """
    Синхронизация списка кластеров FBO с Ozon.
    
    Расписание: еженедельно (понедельник 02:00)
    Источник: POST /v1/warehouse/list
    Результат: обновление logistic_clusters
    """
    import asyncio
    
    async def _sync():
        warehouse_service = get_warehouse_service()
        
        for brand in ["ohana_market", "ohana_kids"]:
            adapter = get_ozon_adapter(brand)
            warehouses = await adapter.get_warehouses_list()
            count = await warehouse_service.sync_clusters(warehouses)
        
        return {"clusters_synced": count}
    
    result = asyncio.run(_sync())
    
    logger.info("sync_ozon_warehouses_completed", results=result)
    return {"status": "success", "results": result}

8.4.4 import_1c_stocks

# tasks/import_1c_stocks.py
@shared_task(
    name="logistic.tasks.import_1c_stocks",
    bind=True,
    max_retries=2,
    default_retry_delay=600,
)
def import_1c_stocks(self):
    """
    Импорт остатков внутреннего склада из 1С.
    
    Расписание: 08:00 и 14:00
    Источник: файлы XLSX/XML из /data/imports/1c
    Результат: warehouse_stocks + import_logs
    
    Подробности: adolf_logistic_5_1c_integration_v2_0.md
    """
    import asyncio
    
    async def _import():
        service = get_import_service()
        return await service.run_import()
    
    result = asyncio.run(_import())
    
    logger.info(
        "import_1c_stocks_completed",
        files_processed=result.files_processed,
        total_imported=result.total_imported,
        total_skipped=result.total_skipped
    )
    
    return {
        "status": "success",
        "files_processed": result.files_processed,
        "total_imported": result.total_imported,
        "total_skipped": result.total_skipped
    }

8.4.5 generate_supply_tasks

# tasks/generate_supply_tasks.py
@shared_task(
    name="logistic.tasks.generate_supply_tasks",
    bind=True,
    max_retries=1,
)
def generate_supply_tasks(self, brand_id: str = "all"):
    """
    Ежедневная генерация наряд-заданий на отгрузку FBO.
    
    Расписание: ежедневно 07:00
    Логика: deficit = target_stock - fbo_stock - in_transit
    Результат: supply_tasks + алерты TASKS_GENERATED / WAREHOUSE_LOW
    
    Подробности: adolf_logistic_4_supply_task_engine_v2_0.md
    """
    import asyncio
    
    async def _generate():
        task_gen = get_task_generator()
        supply_svc = get_supply_task_service()
        alert_svc = get_alert_service()
        brands = resolve_brands(brand_id)
        
        total_batch = {
            "total_tasks": 0,
            "total_quantity": 0,
            "urgent": 0,
            "planned": 0,
            "recommended": 0,
            "purchase_required": 0,
        }
        
        for brand in brands:
            # Расчёт дефицитов
            deficits = await task_gen.calculate_all_deficits(brand)
            
            # Генерация заданий
            tasks = await task_gen.generate_tasks(deficits, brand)
            
            # Сохранение
            saved = await supply_svc.batch_create(tasks)
            
            total_batch["total_tasks"] += len(saved)
            total_batch["total_quantity"] += sum(t.quantity for t in saved)
            total_batch["urgent"] += sum(
                1 for t in saved if t.priority == "urgent"
            )
            total_batch["planned"] += sum(
                1 for t in saved if t.priority == "planned"
            )
            total_batch["recommended"] += sum(
                1 for t in saved if t.priority == "recommended"
            )
            
            # Алерты WAREHOUSE_LOW
            purchase_needed = [
                d for d in deficits if d.coverage == "none"
            ]
            total_batch["purchase_required"] += len(purchase_needed)
            
            for item in purchase_needed:
                await alert_svc.create_alert(
                    alert_type="WAREHOUSE_LOW",
                    severity="high",
                    article=item.article,
                    message=(
                        f"Товар {item.article} отсутствует на складе 1С. "
                        f"Дефицит FBO: {item.deficit} шт. "
                        f"Требуется закупка."
                    ),
                    brand_id=brand
                )
            
            # Алерт TASKS_GENERATED
            await alert_svc.create_alert(
                alert_type="TASKS_GENERATED",
                severity="low",
                message=(
                    f"Сформировано {len(saved)} заданий "
                    f"({sum(1 for t in saved if t.priority == 'urgent')} urgent)"
                ),
                brand_id=brand
            )
        
        return total_batch
    
    result = asyncio.run(_generate())
    
    logger.info(
        "generate_supply_tasks_completed",
        **result
    )
    
    return {"status": "success", **result}

8.4.6 auto_cancel_expired

# tasks/auto_cancel_expired.py
@shared_task(
    name="logistic.tasks.auto_cancel_expired",
)
def auto_cancel_expired(max_age_hours: int = 48):
    """
    Автоотмена неподтверждённых заданий старше 48 часов.
    
    Расписание: каждые 6 часов
    Отменяет: status='new' AND created_at < NOW() - 48h
    Результат: supply_tasks → status='cancelled' + алерт TASK_OVERDUE
    """
    import asyncio
    
    async def _cancel():
        supply_svc = get_supply_task_service()
        alert_svc = get_alert_service()
        
        cancelled = await supply_svc.auto_cancel_expired(max_age_hours)
        
        for task in cancelled:
            await alert_svc.create_alert(
                alert_type="TASK_OVERDUE",
                severity="medium",
                article=task.article,
                cluster_name=task.cluster_name,
                message=(
                    f"Задание {task.task_number} автоотменено "
                    f"(не подтверждено > {max_age_hours}ч)"
                ),
                brand_id=task.brand_id
            )
        
        return {"cancelled_count": len(cancelled)}
    
    result = asyncio.run(_cancel())
    
    if result["cancelled_count"] > 0:
        logger.warning(
            "auto_cancel_expired_completed",
            **result
        )
    
    return {"status": "success", **result}

8.4.7 generate_alerts

# tasks/generate_alerts.py
@shared_task(
    name="logistic.tasks.generate_alerts",
)
def generate_alerts(brand_id: str = "all"):
    """
    Генерация алертов по текущим остаткам FBO.
    
    Расписание: каждые 30 минут (после sync_ozon_stocks)
    Типы: URGENT_STOCK, LOW_STOCK, OUT_OF_STOCK, 
          OZON_URGENT, STOCK_RECOVERED
    
    Подробности: adolf_logistic_3_stock_monitor_v2_0.md
    """
    import asyncio
    
    async def _generate():
        stock_monitor = get_stock_monitor()
        brands = resolve_brands(brand_id)
        total = 0
        
        for brand in brands:
            alerts = await stock_monitor.check_and_generate_alerts(brand)
            total += len(alerts)
        
        return {"alerts_generated": total}
    
    result = asyncio.run(_generate())
    
    if result["alerts_generated"] > 0:
        logger.info("generate_alerts_completed", **result)
    
    return {"status": "success", **result}

8.4.8 cleanup_old_data

# tasks/cleanup.py
@shared_task(
    name="logistic.tasks.cleanup_old_data",
)
def cleanup_old_data(retention_days: int = 90):
    """
    Очистка устаревших данных.
    
    Расписание: еженедельно (воскресенье 04:00)
    Удаляет: снимки FBO, историю 1С, прочитанные алерты,
             завершённые/отменённые задания старше retention_days.
    
    Использует: logistic_cleanup_old_data(90) — SQL функция
    """
    import asyncio
    
    async def _cleanup():
        db = get_db_session()
        result = await db.fetch_one(
            "SELECT * FROM logistic_cleanup_old_data(:days)",
            {"days": retention_days}
        )
        return dict(result)
    
    result = asyncio.run(_cleanup())
    
    logger.info(
        "cleanup_old_data_completed",
        retention_days=retention_days,
        **result
    )
    
    return {"status": "success", **result}

8.4.9 cleanup_import_archive

# tasks/cleanup_import_archive.py
@shared_task(
    name="logistic.tasks.cleanup_import_archive",
)
def cleanup_import_archive(days_to_keep: int = 90):
    """
    Очистка архива файлов 1С старше 90 дней.
    
    Расписание: ежедневно 04:00
    Директория: /data/imports/1c/archive
    
    Подробности: adolf_logistic_5_1c_integration_v2_0.md
    """
    from pathlib import Path
    from datetime import datetime, timedelta
    
    archive_dir = Path("/data/imports/1c/archive")
    
    if not archive_dir.exists():
        return {"status": "success", "deleted_files": 0}
    
    cutoff = datetime.now() - timedelta(days=days_to_keep)
    deleted = 0
    
    for f in archive_dir.iterdir():
        if f.is_file() and f.stat().st_mtime < cutoff.timestamp():
            f.unlink()
            deleted += 1
    
    if deleted > 0:
        logger.info(
            "cleanup_import_archive_completed",
            deleted_files=deleted
        )
    
    return {"status": "success", "deleted_files": deleted}

8.5 Расписание задач

Сводная таблица

ЗадачаCronОчередьRetryОписание
sync_ozon_stocks*/30 * * * *sync3 × 120сОстатки FBO по кластерам
sync_ozon_analytics0 5 * * *sync2 × 600сПродажи за 28 дней
sync_ozon_warehouses0 2 * * 1sync2Список кластеров
import_1c_stocks0 8,14 * * *import2 × 600сФайловый импорт 1С
cleanup_import_archive0 4 * * *importОчистка архива > 90 дней
generate_supply_tasks0 7 * * *logic1Наряд-задания
auto_cancel_expired0 */6 * * *logicАвтоотмена NEW > 48ч
generate_alerts*/30 * * * *logicАлерты по остаткам
cleanup_old_data0 4 * * 0logicОчистка БД > 90 дней

Порядок выполнения (daily timeline)

02:00  sync_ozon_warehouses (понедельник)
04:00  cleanup_old_data (воскресенье)
04:00  cleanup_import_archive
05:00  sync_ozon_analytics
07:00  generate_supply_tasks ← зависит от свежих данных stocks + 1С
08:00  import_1c_stocks (утро)
14:00  import_1c_stocks (день)
*/30   sync_ozon_stocks → generate_alerts (цепочка)
*/6h   auto_cancel_expired

8.6 Зависимости между задачами

Критическая цепочка для generate_supply_tasks (07:00):
  1. sync_ozon_stocks — свежие остатки FBO (последний за 06:30)
  2. sync_ozon_analytics — velocity из Ozon (05:00)
  3. import_1c_stocks — остатки 1С (предыдущий день 14:00 или утро 08:00)

8.7 Мониторинг

Prometheus метрики

from prometheus_client import Counter, Histogram, Gauge

# Счётчики
task_runs = Counter(
    "logistic_celery_task_runs_total",
    "Total task runs",
    ["task_name", "status"]
)

# Время выполнения
task_duration = Histogram(
    "logistic_celery_task_duration_seconds",
    "Task execution time",
    ["task_name"],
    buckets=[1, 5, 10, 30, 60, 120, 300]
)

# Активные задачи
active_tasks = Gauge(
    "logistic_celery_active_tasks",
    "Currently running tasks",
    ["task_name"]
)

# Специфические метрики
supply_tasks_generated = Counter(
    "logistic_supply_tasks_generated_total",
    "Supply tasks generated by priority",
    ["priority"]
)

ozon_api_calls = Counter(
    "logistic_ozon_api_calls_total",
    "Ozon API calls by endpoint",
    ["endpoint", "status"]
)

import_1c_records = Counter(
    "logistic_1c_import_records_total",
    "1C import records",
    ["status"]  # imported / skipped / error
)

Flower

celery -A logistic.celery_config flower --port=5555

8.8 Запуск

Workers

# Sync worker (Ozon API)
celery -A logistic.celery_config worker \
    -Q logistic.sync -c 2 \
    --loglevel=INFO -n sync@%h

# Import worker (1С files)
celery -A logistic.celery_config worker \
    -Q logistic.import -c 1 \
    --loglevel=INFO -n import@%h

# Logic worker (supply tasks, alerts)
celery -A logistic.celery_config worker \
    -Q logistic.logic -c 2 \
    --loglevel=INFO -n logic@%h

Beat

celery -A logistic.celery_config beat --loglevel=INFO

Docker Compose

services:
  logistic-worker-sync:
    build: .
    command: >
      celery -A logistic.celery_config worker 
      -Q logistic.sync -c 2 -n sync@%h
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - LOGISTIC_OZON_CLIENT_ID=${OZON_CLIENT_ID}
      - LOGISTIC_OZON_API_KEY=${OZON_API_KEY}
    depends_on:
      - redis
      - postgres
  
  logistic-worker-import:
    build: .
    command: >
      celery -A logistic.celery_config worker 
      -Q logistic.import -c 1 -n import@%h
    volumes:
      - import_data:/data/imports/1c
    depends_on:
      - redis
      - postgres
  
  logistic-worker-logic:
    build: .
    command: >
      celery -A logistic.celery_config worker 
      -Q logistic.logic -c 2 -n logic@%h
    depends_on:
      - redis
      - postgres
  
  logistic-beat:
    build: .
    command: celery -A logistic.celery_config beat
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  import_data:
    driver: local

8.9 Ручной запуск

Через API

router = APIRouter(prefix="/logistic/tasks", tags=["Logistic Tasks"])

ALLOWED_TASKS = {
    "sync_ozon_stocks": sync_ozon_stocks,
    "sync_ozon_analytics": sync_ozon_analytics,
    "sync_ozon_warehouses": sync_ozon_warehouses,
    "import_1c_stocks": import_1c_stocks,
    "generate_supply_tasks": generate_supply_tasks,
    "auto_cancel_expired": auto_cancel_expired,
    "generate_alerts": generate_alerts,
    "cleanup_old_data": cleanup_old_data,
}


@router.post("/{task_name}/run")
async def run_task(
    task_name: str,
    user=Depends(require_role(["admin", "director"]))
) -> dict:
    """Ручной запуск Celery задачи."""
    if task_name not in ALLOWED_TASKS:
        raise HTTPException(404, f"Task '{task_name}' not found")
    
    result = ALLOWED_TASKS[task_name].delay()
    return {"task_id": result.id, "task_name": task_name, "status": "queued"}


@router.get("/{task_id}/status")
async def get_task_status(task_id: str) -> dict:
    """Статус выполнения задачи."""
    from celery.result import AsyncResult
    result = AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

8.10 Промпт для Claude Code

Реализуй Celery tasks для модуля Logistic v2.0 согласно
adolf_logistic_8_celery_v2_0.md

Требования:
1. Конфигурация: 3 очереди (sync, import, logic), маршрутизация
2. Задачи (9 штук):
   - sync_ozon_stocks (*/30 мин, retry 3×120с)
   - sync_ozon_analytics (05:00, retry 2×600с)
   - sync_ozon_warehouses (пн 02:00)
   - import_1c_stocks (08:00+14:00, retry 2×600с)
   - generate_supply_tasks (07:00)
   - auto_cancel_expired (*/6ч, отмена NEW > 48ч)
   - generate_alerts (*/30 мин, после sync_stocks)
   - cleanup_old_data (вс 04:00, SQL функция)
   - cleanup_import_archive (04:00, файлы > 90 дней)
3. Docker Compose: 3 workers + beat
4. API: ручной запуск + статус задачи
5. Метрики: Prometheus counters/histograms

Зависимости: celery, redis, structlog, prometheus_client

8.11 Связанные документы

ДокументОписание
1. ArchitectureАрхитектура, потоки данных
2. Ozon IntegrationOzon API endpoints
3. Stock MonitorАлерты по остаткам
4. Supply Task EngineГенерация заданий
5. 1С IntegrationФайловый импорт
6. DatabaseТаблицы, функция cleanup

Документ подготовлен: Февраль 2026
Версия: 2.0
Статус: Черновик
Заменяет: adolf_logistic_8_celery_v1_0.md