Skip to main content
Проект: Интеллектуальная система управления логистикой маркетплейсов
Модуль: Logistic / 1С Integration
Версия: 2.0
Дата: Февраль 2026
Заменяет: adolf_logistic_5_recommendation_engine_v1_0.md

5.1 Назначение

Описание

1С Integration — компонент модуля Logistic, отвечающий за:
  • Импорт остатков внутреннего склада из 1С (файловый обмен XLSX/XML)
  • Маппинг артикулов 1С ↔ Ozon offer_id ↔ ozon_sku
  • Валидацию и нормализацию импортируемых данных
  • Отслеживание истории импортов и обнаружение аномалий
  • Экспорт наряд-заданий в формат, совместимый с 1С
  • Архивирование обработанных файлов

Роль в системе

Отличие от Recommendation Engine v1.0

Параметрv1.0 (Recommendation Engine)v2.0 (1С Integration)
ЗадачаРасчёт распределения поставок по складам WBИмпорт/экспорт данных между ADOLF и 1С
ВходСпрос по регионам + коэфф. приёмки WBФайлы XLSX/XML из 1С
ВыходРекомендация по распределениюАктуальные остатки внутреннего склада
МаркетплейсWildberriesНе зависит (внутренний склад)
СтатусУдалёнНовый компонент
Примечание: Логика расчёта распределения перенесена в Supply Task Engine (раздел 4). 1С Integration — чисто интеграционный модуль.

5.2 Формат файла 1С

Ожидаемая структура XLSX

КолонкаТипОбязательноеОписание
АртикулstringВнутренний артикул товара (= offer_id в Ozon)
НаименованиеstringНазвание товара
ОстатокintКоличество на складе (шт)
ЕдиницаstringЕдиница измерения (по умолчанию «шт»)
ШтрихкодstringEAN-13 (для дополнительного маппинга)

Ожидаемая структура XML

<?xml version="1.0" encoding="UTF-8"?>
<ОстаткиСклада Дата="2026-02-06" Склад="Основной">
  <Товар>
    <Артикул>51005/54</Артикул>
    <Наименование>Платье миди зелёное</Наименование>
    <Остаток>340</Остаток>
    <Единица>шт</Единица>
  </Товар>
  <Товар>
    <Артикул>K-20115</Артикул>
    <Наименование>Комбинезон детский</Наименование>
    <Остаток>120</Остаток>
    <Единица>шт</Единица>
  </Товар>
</ОстаткиСклада>

Конфигурация

@dataclass
class ImportConfig:
    """Конфигурация импорта 1С."""
    
    # Директория для сканирования
    import_dir: str = "/data/1c-import"
    
    # Директория для архива
    archive_dir: str = "/data/1c-archive"
    
    # Допустимые форматы
    allowed_extensions: list[str] = field(
        default_factory=lambda: [".xlsx", ".xml"]
    )
    
    # XLSX: номера колонок (0-based) или названия
    xlsx_article_col: int | str = 0      # "Артикул"
    xlsx_name_col: int | str = 1         # "Наименование"
    xlsx_quantity_col: int | str = 2     # "Остаток"
    xlsx_barcode_col: int | str | None = 4  # "Штрихкод" (опционально)
    xlsx_header_row: int = 0             # строка заголовков
    
    # Валидация
    max_quantity: int = 100_000          # максимально допустимый остаток
    min_rows: int = 1                    # минимум строк для валидного файла
    max_rows: int = 50_000              # максимум строк
    
    # Аномалии
    anomaly_threshold_pct: float = 50.0  # % изменения → предупреждение

5.3 FileImportAdapter

Парсинг файлов

import structlog
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime

logger = structlog.get_logger("logistic.1c_import")


@dataclass
class ParsedStockRow:
    """Одна строка импортированных данных."""
    article: str
    product_name: str
    quantity: int
    unit: str = "шт"
    barcode: str | None = None
    row_number: int = 0


@dataclass
class ParseResult:
    """Результат парсинга файла."""
    rows: list[ParsedStockRow]
    file_name: str
    file_format: str         # "xlsx" | "xml"
    file_date: datetime | None  # дата из файла (если указана)
    parse_errors: list[str]
    total_rows: int
    valid_rows: int


class FileImportAdapter:
    """Адаптер для парсинга файлов 1С (XLSX/XML)."""
    
    def __init__(self, config: ImportConfig):
        self.config = config
    
    def parse(self, file_path: str | Path) -> ParseResult:
        """
        Парсинг файла. Автоматическое определение формата.
        
        Raises:
            UnsupportedFormatError: неподдерживаемый формат
            EmptyFileError: файл пуст
        """
        path = Path(file_path)
        ext = path.suffix.lower()
        
        if ext == ".xlsx":
            return self._parse_xlsx(path)
        elif ext == ".xml":
            return self._parse_xml(path)
        else:
            raise UnsupportedFormatError(
                f"Формат {ext} не поддерживается. "
                f"Допустимые: {self.config.allowed_extensions}"
            )
    
    def _parse_xlsx(self, path: Path) -> ParseResult:
        """Парсинг XLSX через openpyxl."""
        from openpyxl import load_workbook
        
        wb = load_workbook(path, read_only=True, data_only=True)
        ws = wb.active
        
        rows: list[ParsedStockRow] = []
        errors: list[str] = []
        total = 0
        
        for i, row in enumerate(ws.iter_rows(
            min_row=self.config.xlsx_header_row + 2,  # skip header
            values_only=True
        )):
            total += 1
            row_num = i + self.config.xlsx_header_row + 2
            
            try:
                article = self._clean_string(row[self._col_idx("article")])
                name = self._clean_string(row[self._col_idx("name")])
                qty = self._parse_quantity(row[self._col_idx("quantity")])
                barcode = self._safe_get(row, self._col_idx("barcode"))
                
                if not article:
                    errors.append(f"Строка {row_num}: пустой артикул")
                    continue
                
                if qty < 0:
                    errors.append(f"Строка {row_num}: отрицательный остаток ({qty})")
                    continue
                
                if qty > self.config.max_quantity:
                    errors.append(
                        f"Строка {row_num}: остаток {qty} > max ({self.config.max_quantity})"
                    )
                    continue
                
                rows.append(ParsedStockRow(
                    article=article,
                    product_name=name or "",
                    quantity=qty,
                    barcode=str(barcode) if barcode else None,
                    row_number=row_num
                ))
                
            except Exception as e:
                errors.append(f"Строка {row_num}: {str(e)}")
        
        wb.close()
        
        return ParseResult(
            rows=rows,
            file_name=path.name,
            file_format="xlsx",
            file_date=None,
            parse_errors=errors,
            total_rows=total,
            valid_rows=len(rows)
        )
    
    def _parse_xml(self, path: Path) -> ParseResult:
        """Парсинг XML через lxml."""
        from lxml import etree
        
        tree = etree.parse(str(path))
        root = tree.getroot()
        
        # Получаем дату из атрибута корневого элемента
        file_date = None
        date_str = root.get("Дата") or root.get("date")
        if date_str:
            try:
                file_date = datetime.strptime(date_str, "%Y-%m-%d")
            except ValueError:
                pass
        
        rows: list[ParsedStockRow] = []
        errors: list[str] = []
        total = 0
        
        for i, item in enumerate(root.findall(".//Товар")):
            total += 1
            row_num = i + 1
            
            try:
                article = self._clean_string(
                    item.findtext("Артикул") or item.findtext("Article")
                )
                name = self._clean_string(
                    item.findtext("Наименование") or item.findtext("Name")
                )
                qty_text = item.findtext("Остаток") or item.findtext("Quantity")
                qty = self._parse_quantity(qty_text)
                
                if not article:
                    errors.append(f"Товар #{row_num}: пустой артикул")
                    continue
                
                rows.append(ParsedStockRow(
                    article=article,
                    product_name=name or "",
                    quantity=qty,
                    row_number=row_num
                ))
                
            except Exception as e:
                errors.append(f"Товар #{row_num}: {str(e)}")
        
        return ParseResult(
            rows=rows,
            file_name=path.name,
            file_format="xml",
            file_date=file_date,
            parse_errors=errors,
            total_rows=total,
            valid_rows=len(rows)
        )
    
    def _clean_string(self, value) -> str | None:
        if value is None:
            return None
        return str(value).strip()
    
    def _parse_quantity(self, value) -> int:
        if value is None:
            return 0
        return int(float(str(value).replace(",", ".").strip()))
    
    def _col_idx(self, field: str) -> int:
        mapping = {
            "article": self.config.xlsx_article_col,
            "name": self.config.xlsx_name_col,
            "quantity": self.config.xlsx_quantity_col,
            "barcode": self.config.xlsx_barcode_col,
        }
        col = mapping.get(field)
        if col is None:
            return -1
        if isinstance(col, int):
            return col
        return col  # имя колонки — lookup по заголовку
    
    def _safe_get(self, row: tuple, idx: int):
        if idx < 0 or idx >= len(row):
            return None
        return row[idx]

5.4 ImportService

Основной сервис

Реализация

class ImportService:
    """Сервис импорта данных из 1С."""
    
    def __init__(
        self,
        adapter: FileImportAdapter,
        import_repo: ImportRepository,
        mapping_repo: SKUMappingRepository,
        alert_service: AlertService,
        config: ImportConfig
    ):
        self.adapter = adapter
        self.import_repo = import_repo
        self.mapping = mapping_repo
        self.alert_service = alert_service
        self.config = config
    
    async def scan_and_import(self) -> list[ImportResult]:
        """
        Сканирование директории и импорт новых файлов.
        Celery task: по расписанию (1-2 раза в день).
        """
        import_dir = Path(self.config.import_dir)
        if not import_dir.exists():
            logger.warning("import_dir_not_found", path=str(import_dir))
            return []
        
        results = []
        
        for file_path in sorted(import_dir.iterdir()):
            if file_path.suffix.lower() not in self.config.allowed_extensions:
                continue
            if file_path.name.startswith("."):
                continue
            
            # Проверяем, не обработан ли уже
            if await self.import_repo.is_file_processed(file_path.name):
                continue
            
            result = await self.import_file(file_path)
            results.append(result)
        
        return results
    
    async def import_file(
        self, file_path: str | Path
    ) -> ImportResult:
        """
        Импорт одного файла.
        
        Может вызываться:
        - Автоматически (scan_and_import)
        - Вручную (через API / Open WebUI)
        """
        path = Path(file_path)
        import_id = f"imp_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{path.stem}"
        
        logger.info("file_import_started", file=path.name, import_id=import_id)
        
        try:
            # 1. Парсинг
            parsed = self.adapter.parse(path)
            
            if not parsed.rows:
                return await self._log_failure(
                    import_id, path.name, "Файл не содержит валидных строк",
                    parsed.parse_errors
                )
            
            # 2. Маппинг артикулов
            mapped_rows, unmapped = await self._map_articles(parsed.rows)
            
            # 3. Обнаружение аномалий
            anomalies = await self._detect_anomalies(mapped_rows)
            
            # 4. Upsert в БД
            await self.import_repo.upsert_warehouse_stocks(
                import_id=import_id,
                stocks=[
                    WarehouseStockRecord(
                        article=row.article,
                        product_name=row.product_name,
                        warehouse_stock=row.quantity,
                        import_id=import_id,
                        brand_id=self._detect_brand(row.article)
                    )
                    for row in mapped_rows
                ]
            )
            
            # 5. Логирование
            import_log = ImportLog(
                import_id=import_id,
                file_name=path.name,
                file_format=parsed.file_format,
                status="success",
                total_rows=parsed.total_rows,
                valid_rows=parsed.valid_rows,
                unmapped_count=len(unmapped),
                anomaly_count=len(anomalies),
                parse_errors=parsed.parse_errors,
                unmapped_articles=unmapped,
                imported_at=datetime.now()
            )
            await self.import_repo.save_log(import_log)
            
            # 6. Алерты
            if anomalies:
                await self._alert_anomalies(import_id, anomalies)
            if unmapped:
                logger.warning(
                    "unmapped_articles",
                    count=len(unmapped),
                    articles=unmapped[:10]
                )
            
            # 7. Архивирование
            self._archive_file(path)
            
            logger.info(
                "file_import_completed",
                import_id=import_id,
                valid=parsed.valid_rows,
                unmapped=len(unmapped),
                anomalies=len(anomalies)
            )
            
            return ImportResult(
                import_id=import_id,
                success=True,
                valid_rows=parsed.valid_rows,
                unmapped=unmapped,
                anomalies=anomalies
            )
            
        except Exception as e:
            logger.error("file_import_failed", file=path.name, error=str(e))
            return await self._log_failure(import_id, path.name, str(e))
    
    async def _map_articles(
        self, rows: list[ParsedStockRow]
    ) -> tuple[list[ParsedStockRow], list[str]]:
        """Проверка артикулов по маппингу SKU."""
        known_articles = await self.mapping.get_all_articles()
        
        mapped = []
        unmapped = []
        
        for row in rows:
            if row.article in known_articles:
                mapped.append(row)
            else:
                unmapped.append(row.article)
        
        return mapped, unmapped
    
    async def _detect_anomalies(
        self, rows: list[ParsedStockRow]
    ) -> list[dict]:
        """Обнаружение аномальных изменений остатков."""
        anomalies = []
        threshold = self.config.anomaly_threshold_pct
        
        for row in rows:
            prev = await self.import_repo.get_previous_stock(row.article)
            if prev is None:
                continue
            
            if prev == 0:
                if row.quantity > 0:
                    continue  # нормально: было 0, стало > 0
            else:
                change_pct = abs(row.quantity - prev) / prev * 100
                if change_pct >= threshold:
                    anomalies.append({
                        "article": row.article,
                        "previous": prev,
                        "current": row.quantity,
                        "change_pct": round(change_pct, 1)
                    })
        
        return anomalies
    
    def _archive_file(self, path: Path) -> None:
        """Перемещение файла в архив."""
        archive_dir = Path(self.config.archive_dir)
        month_dir = archive_dir / datetime.now().strftime("%Y-%m")
        month_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime("%H%M%S")
        archive_name = f"{timestamp}_{path.name}"
        path.rename(month_dir / archive_name)
    
    async def _log_failure(
        self, import_id: str, file_name: str, error: str,
        parse_errors: list[str] | None = None
    ) -> ImportResult:
        log = ImportLog(
            import_id=import_id,
            file_name=file_name,
            file_format="unknown",
            status="error",
            total_rows=0, valid_rows=0,
            unmapped_count=0, anomaly_count=0,
            parse_errors=parse_errors or [error],
            imported_at=datetime.now()
        )
        await self.import_repo.save_log(log)
        return ImportResult(import_id=import_id, success=False, error=error)
    
    async def _alert_anomalies(
        self, import_id: str, anomalies: list[dict]
    ) -> None:
        await self.alert_service.create_alert(StockAlert(
            id=uuid4(),
            type=AlertType.IMPORT_ANOMALY,
            severity=AlertSeverity.MEDIUM,
            article="IMPORT",
            ozon_sku=None,
            cluster_name="WAREHOUSE",
            message=f"Импорт {import_id}: {len(anomalies)} аномалий "
                    f"(изменение > {self.config.anomaly_threshold_pct}%)",
            details={"import_id": import_id, "anomalies": anomalies[:20]},
            brand_id="all",
            created_at=datetime.now()
        ))
    
    def _detect_brand(self, article: str) -> str:
        return "ohana_kids" if article.startswith("K") else "ohana_market"

5.5 Модель данных

Записи импорта

@dataclass
class WarehouseStockRecord:
    """Запись остатка на внутреннем складе."""
    article: str
    product_name: str
    warehouse_stock: int
    import_id: str
    brand_id: str
    updated_at: datetime = field(default_factory=datetime.now)


@dataclass
class ImportLog:
    """Лог импорта файла."""
    import_id: str
    file_name: str
    file_format: str           # xlsx / xml
    status: str                # success / error
    total_rows: int
    valid_rows: int
    unmapped_count: int
    anomaly_count: int
    parse_errors: list[str]
    unmapped_articles: list[str] = field(default_factory=list)
    imported_at: datetime = field(default_factory=datetime.now)


@dataclass
class ImportResult:
    """Результат операции импорта."""
    import_id: str
    success: bool
    valid_rows: int = 0
    unmapped: list[str] = field(default_factory=list)
    anomalies: list[dict] = field(default_factory=list)
    error: str | None = None

SKU Mapping

@dataclass
class SKUMapping:
    """Маппинг артикулов между системами."""
    article: str          # артикул 1С (= offer_id Ozon)
    ozon_sku: int | None  # SKU Ozon (числовой)
    product_name: str
    brand_id: str
    is_active: bool = True
    updated_at: datetime = field(default_factory=datetime.now)


class SKUMappingRepository:
    """Репозиторий маппинга артикулов."""
    
    async def get_all_articles(self) -> set[str]:
        """Все известные артикулы."""
        query = "SELECT article FROM sku_mapping WHERE is_active = true"
        rows = await self.db.fetch_all(query)
        return {row["article"] for row in rows}
    
    async def get_by_article(self, article: str) -> SKUMapping | None:
        query = "SELECT * FROM sku_mapping WHERE article = :article"
        return await self.db.fetch_one(query, {"article": article})
    
    async def get_by_ozon_sku(self, ozon_sku: int) -> SKUMapping | None:
        query = "SELECT * FROM sku_mapping WHERE ozon_sku = :sku"
        return await self.db.fetch_one(query, {"sku": ozon_sku})
    
    async def upsert(self, mapping: SKUMapping) -> None:
        """Создать или обновить маппинг."""
        query = """
            INSERT INTO sku_mapping (article, ozon_sku, product_name, brand_id, is_active, updated_at)
            VALUES (:article, :ozon_sku, :name, :brand_id, :is_active, NOW())
            ON CONFLICT (article) DO UPDATE SET
                ozon_sku = EXCLUDED.ozon_sku,
                product_name = EXCLUDED.product_name,
                is_active = EXCLUDED.is_active,
                updated_at = NOW()
        """
        await self.db.execute(query, {
            "article": mapping.article,
            "ozon_sku": mapping.ozon_sku,
            "name": mapping.product_name,
            "brand_id": mapping.brand_id,
            "is_active": mapping.is_active
        })
    
    async def sync_from_ozon(self, ozon_products: list[dict]) -> int:
        """
        Синхронизация маппинга из каталога Ozon.
        Вызывается при первой настройке и периодически.
        """
        count = 0
        for product in ozon_products:
            await self.upsert(SKUMapping(
                article=product["offer_id"],
                ozon_sku=product["sku"],
                product_name=product["name"],
                brand_id="ohana_kids" if product["offer_id"].startswith("K") else "ohana_market",
                is_active=product.get("is_visible", True)
            ))
            count += 1
        return count

5.6 Экспорт для 1С

Генерация файлов наряд-заданий

class ExportService:
    """Экспорт данных из ADOLF для 1С."""
    
    def __init__(
        self,
        task_repo: SupplyTaskRepository,
        config: ImportConfig
    ):
        self.task_repo = task_repo
        self.export_dir = Path(config.import_dir).parent / "1c-export"
        self.export_dir.mkdir(parents=True, exist_ok=True)
    
    async def export_supply_tasks_xlsx(
        self,
        date: datetime | None = None,
        status: str | None = None
    ) -> Path:
        """
        Экспорт наряд-заданий в XLSX для 1С.
        
        Формирует файл, который кладовщик может
        использовать для сборки отгрузки.
        """
        from openpyxl import Workbook
        from openpyxl.styles import Font, Alignment, PatternFill
        
        tasks = await self.task_repo.get_tasks(
            date=date or datetime.now(),
            status=status
        )
        
        wb = Workbook()
        ws = wb.active
        ws.title = "Наряд-задания"
        
        # Заголовки
        headers = [
            "№ задания", "Артикул", "Наименование",
            "Количество (шт)", "Кластер Ozon", "Приоритет",
            "Статус", "Дней до обнуления", "FBO остаток"
        ]
        
        header_font = Font(bold=True)
        for col, header in enumerate(headers, 1):
            cell = ws.cell(row=1, column=col, value=header)
            cell.font = header_font
        
        # Данные
        priority_fill = {
            "urgent": PatternFill(fgColor="FFCCCC", fill_type="solid"),
            "planned": PatternFill(fgColor="FFFFCC", fill_type="solid"),
        }
        
        for i, task in enumerate(tasks, 2):
            ws.cell(row=i, column=1, value=task.task_number)
            ws.cell(row=i, column=2, value=task.article)
            ws.cell(row=i, column=3, value=task.product_name)
            ws.cell(row=i, column=4, value=task.quantity)
            ws.cell(row=i, column=5, value=task.cluster_name)
            ws.cell(row=i, column=6, value=task.priority.value)
            ws.cell(row=i, column=7, value=task.status.value)
            ws.cell(row=i, column=8, value=task.days_to_zero)
            ws.cell(row=i, column=9, value=task.fbo_stock)
            
            # Цветовая маркировка приоритета
            fill = priority_fill.get(task.priority.value)
            if fill:
                for col in range(1, len(headers) + 1):
                    ws.cell(row=i, column=col).fill = fill
        
        # Автоширина
        for col in ws.columns:
            max_len = max(len(str(cell.value or "")) for cell in col)
            ws.column_dimensions[col[0].column_letter].width = min(max_len + 2, 40)
        
        # Сохранение
        date_str = (date or datetime.now()).strftime("%Y-%m-%d")
        file_name = f"supply_tasks_{date_str}.xlsx"
        file_path = self.export_dir / file_name
        wb.save(str(file_path))
        
        return file_path

5.7 ImportRepository

Методы репозитория

class ImportRepository:
    """Репозиторий для работы с данными импорта 1С."""
    
    async def upsert_warehouse_stocks(
        self, import_id: str, stocks: list[WarehouseStockRecord]
    ) -> int:
        """Upsert остатков. Возвращает количество обработанных."""
        query = """
            INSERT INTO warehouse_stocks (article, product_name, warehouse_stock, import_id, brand_id, updated_at)
            VALUES (:article, :product_name, :warehouse_stock, :import_id, :brand_id, NOW())
            ON CONFLICT (article) DO UPDATE SET
                product_name = EXCLUDED.product_name,
                warehouse_stock = EXCLUDED.warehouse_stock,
                import_id = EXCLUDED.import_id,
                updated_at = NOW()
        """
        for stock in stocks:
            await self.db.execute(query, asdict(stock))
        return len(stocks)
    
    async def get_latest_stocks(
        self, brand_id: str | None = None
    ) -> list[WarehouseStockRecord]:
        """Текущие остатки на складе."""
        query = "SELECT * FROM warehouse_stocks"
        params = {}
        if brand_id:
            query += " WHERE brand_id = :brand_id"
            params["brand_id"] = brand_id
        return await self.db.fetch_all(query, params)
    
    async def get_latest_stock(self, article: str) -> int | None:
        """Остаток одного артикула."""
        query = "SELECT warehouse_stock FROM warehouse_stocks WHERE article = :article"
        row = await self.db.fetch_one(query, {"article": article})
        return row["warehouse_stock"] if row else None
    
    async def get_previous_stock(self, article: str) -> int | None:
        """Остаток из предыдущего импорта (для обнаружения аномалий)."""
        query = """
            SELECT warehouse_stock FROM warehouse_stocks_history
            WHERE article = :article
            ORDER BY imported_at DESC LIMIT 1
        """
        row = await self.db.fetch_one(query, {"article": article})
        return row["warehouse_stock"] if row else None
    
    async def get_last_import_date(self) -> datetime | None:
        query = "SELECT MAX(imported_at) as last FROM import_logs WHERE status = 'success'"
        row = await self.db.fetch_one(query)
        return row["last"] if row else None
    
    async def is_file_processed(self, file_name: str) -> bool:
        query = "SELECT 1 FROM import_logs WHERE file_name = :name AND status = 'success'"
        row = await self.db.fetch_one(query, {"name": file_name})
        return row is not None
    
    async def save_log(self, log: ImportLog) -> None:
        query = """
            INSERT INTO import_logs 
            (import_id, file_name, file_format, status, total_rows, valid_rows,
             unmapped_count, anomaly_count, parse_errors, unmapped_articles, imported_at)
            VALUES (:import_id, :file_name, :file_format, :status, :total_rows, :valid_rows,
                    :unmapped_count, :anomaly_count, :parse_errors, :unmapped_articles, :imported_at)
        """
        await self.db.execute(query, {
            **asdict(log),
            "parse_errors": json.dumps(log.parse_errors),
            "unmapped_articles": json.dumps(log.unmapped_articles)
        })
    
    async def get_import_history(
        self, limit: int = 20
    ) -> list[ImportLog]:
        query = "SELECT * FROM import_logs ORDER BY imported_at DESC LIMIT :limit"
        return await self.db.fetch_all(query, {"limit": limit})

5.8 API Endpoints

REST API

router = APIRouter(prefix="/logistic/import", tags=["1C Import"])


@router.post("/upload")
async def upload_file(
    file: UploadFile,
    service: ImportService = Depends(get_import_service),
    current_user: User = Depends(get_current_user)
) -> ImportResult:
    """
    Ручная загрузка файла 1С (XLSX/XML).
    Доступно: Manager+.
    """
    # Сохраняем во временный файл
    temp_path = Path(f"/tmp/1c_upload_{file.filename}")
    with open(temp_path, "wb") as f:
        content = await file.read()
        f.write(content)
    
    try:
        return await service.import_file(temp_path)
    finally:
        temp_path.unlink(missing_ok=True)


@router.post("/trigger-scan")
async def trigger_scan(
    service: ImportService = Depends(get_import_service),
    current_user: User = Depends(get_current_user)
) -> list[ImportResult]:
    """Ручной запуск сканирования директории."""
    return await service.scan_and_import()


@router.get("/history")
async def get_import_history(
    limit: int = Query(20, le=100),
    repo: ImportRepository = Depends(get_import_repo)
) -> list[ImportLog]:
    """История импортов."""
    return await repo.get_import_history(limit=limit)


@router.get("/stocks")
async def get_warehouse_stocks(
    brand_id: str | None = Query(None),
    repo: ImportRepository = Depends(get_import_repo),
    current_user: User = Depends(get_current_user)
) -> list[WarehouseStockRecord]:
    """Текущие остатки на складе 1С."""
    return await repo.get_latest_stocks(
        brand_id=brand_id or current_user.brand_id
    )


@router.get("/stocks/{article}")
async def get_article_stock(
    article: str,
    repo: ImportRepository = Depends(get_import_repo)
) -> dict:
    """Остаток конкретного артикула."""
    stock = await repo.get_latest_stock(article)
    return {"article": article, "warehouse_stock": stock}


@router.get("/mapping")
async def get_sku_mapping(
    brand_id: str | None = Query(None),
    limit: int = Query(100, le=1000),
    mapping_repo: SKUMappingRepository = Depends(get_mapping_repo)
) -> list[SKUMapping]:
    """Таблица маппинга артикулов."""
    return await mapping_repo.get_all(brand_id=brand_id, limit=limit)


@router.post("/mapping/sync-ozon")
async def sync_mapping_from_ozon(
    ozon_adapter: OzonLogisticAdapter = Depends(get_ozon_adapter),
    mapping_repo: SKUMappingRepository = Depends(get_mapping_repo),
    current_user: User = Depends(get_current_user)
) -> dict:
    """Синхронизация маппинга из каталога Ozon."""
    products = await ozon_adapter.get_product_list()
    count = await mapping_repo.sync_from_ozon(products)
    return {"synced": count}


@router.get("/export/supply-tasks")
async def export_supply_tasks(
    date: datetime | None = Query(None),
    status: str | None = Query(None),
    export_service: ExportService = Depends(get_export_service)
) -> FileResponse:
    """Экспорт наряд-заданий в XLSX для 1С."""
    from fastapi.responses import FileResponse
    
    file_path = await export_service.export_supply_tasks_xlsx(date, status)
    return FileResponse(
        path=str(file_path),
        filename=file_path.name,
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )

5.9 Celery Tasks

Периодические задачи

CELERY_BEAT_SCHEDULE = {
    "scan-1c-import-morning": {
        "task": "logistic.tasks.scan_1c_import",
        "schedule": crontab(hour=6, minute=30),
    },
    "scan-1c-import-evening": {
        "task": "logistic.tasks.scan_1c_import",
        "schedule": crontab(hour=15, minute=0),
    },
    "sync-sku-mapping-weekly": {
        "task": "logistic.tasks.sync_sku_mapping",
        "schedule": crontab(hour=2, minute=0, day_of_week=1),  # пн 02:00
    },
}


@shared_task(bind=True, max_retries=2, default_retry_delay=300)
def scan_1c_import(self):
    """Сканирование директории и импорт файлов 1С."""
    import asyncio
    
    async def _scan():
        service = get_import_service()
        return await service.scan_and_import()
    
    results = asyncio.run(_scan())
    
    return {
        "files_processed": len(results),
        "successful": sum(1 for r in results if r.success),
        "failed": sum(1 for r in results if not r.success)
    }


@shared_task
def sync_sku_mapping():
    """Синхронизация маппинга артикулов из Ozon."""
    import asyncio
    
    async def _sync():
        adapter = get_ozon_adapter()
        repo = get_mapping_repo()
        products = await adapter.get_product_list()
        return await repo.sync_from_ozon(products)
    
    count = asyncio.run(_sync())
    return {"synced_count": count}

5.10 Алерты

ТипSeverityТриггерОписание
IMPORT_SUCCESSLOWУспешный импортN строк импортировано
IMPORT_ERRORHIGHОшибка парсинга/валидацииФайл не обработан
IMPORT_ANOMALYMEDIUMИзменение > 50%Аномальное изменение остатков
UNMAPPED_ARTICLESMEDIUM> 10 артикулов без маппингаНужна синхронизация с Ozon

5.11 Промпт для Claude Code

Реализуй 1С Integration для модуля Logistic согласно 
adolf_logistic_5_1c_integration_v2_0.md

Требования:
1. FileImportAdapter: парсинг XLSX (openpyxl) и XML (lxml), 
   автоопределение формата, валидация строк
2. ImportService: scan_and_import() — сканирование директории, 
   парсинг, маппинг артикулов, обнаружение аномалий (Δ > 50%), 
   upsert, архивирование в YYYY-MM/
3. SKUMappingRepository: маппинг article ↔ ozon_sku, 
   sync_from_ozon() для первоначального заполнения
4. ExportService: export_supply_tasks_xlsx() — наряд-задания в XLSX
5. ImportRepository: upsert_warehouse_stocks, get_latest_stock, 
   import_logs, is_file_processed
6. API: POST /upload, POST /trigger-scan, GET /history, GET /stocks, 
   GET /mapping, POST /mapping/sync-ozon, GET /export/supply-tasks
7. Celery: импорт 06:30 + 15:00, sync маппинга еженедельно (пн)
8. Алерты: IMPORT_ERROR, IMPORT_ANOMALY, UNMAPPED_ARTICLES

Зависимости: openpyxl, lxml, OzonLogisticAdapter (раздел 2),
AlertService, SupplyTaskRepository (раздел 4)

Документ подготовлен: Февраль 2026
Версия: 2.0
Статус: Черновик
Заменяет: adolf_logistic_5_recommendation_engine_v1_0.md