# services/marketing/bid_cycle.py
import asyncio
import logging
from datetime import datetime
from typing import List
from .safety_logic import SafetyLogic, SafetyAction, CampaignStats, KeywordStats
from .bid_engine import BidEngine, BidCalculation
from .adapters.manager import AdapterManager
from core.redis_lock import RedisLock
from core.notifications import NotificationService
logger = logging.getLogger(__name__)
class BidCorrectionCycle:
"""Цикл корректировки ставок."""
LOCK_KEY = "marketing:bid_cycle:lock"
LOCK_TTL = 300 # 5 минут
def __init__(
self,
adapter_manager: AdapterManager,
safety_logic: SafetyLogic,
bid_engine: BidEngine,
watcher_client,
db_session,
notification_service: NotificationService,
redis_client
):
self.adapters = adapter_manager
self.safety = safety_logic
self.bid_engine = bid_engine
self.watcher = watcher_client
self.db = db_session
self.notifications = notification_service
self.redis = redis_client
async def run(self):
"""Запуск цикла корректировки."""
# Попытка получить lock
lock = RedisLock(self.redis, self.LOCK_KEY, self.LOCK_TTL)
if not await lock.acquire():
logger.warning("Bid cycle already running, skipping")
return
try:
await self._execute_cycle()
finally:
await lock.release()
async def _execute_cycle(self):
"""Выполнение цикла."""
cycle_start = datetime.utcnow()
logger.info(f"Starting bid correction cycle at {cycle_start}")
stats = {
"campaigns_processed": 0,
"keywords_processed": 0,
"bids_updated": 0,
"keywords_paused": 0,
"campaigns_paused": 0,
"alerts_sent": 0,
"errors": 0
}
# Обработка каждого маркетплейса
for mp_code, adapter in self.adapters.get_all_adapters().items():
try:
await self._process_marketplace(mp_code, adapter, stats)
except Exception as e:
logger.error(f"Error processing {mp_code}: {e}")
stats["errors"] += 1
# Сохранение статистики цикла
await self._save_cycle_stats(cycle_start, stats)
logger.info(f"Bid cycle completed: {stats}")
async def _process_marketplace(self, mp_code: str, adapter, stats: dict):
"""Обработка маркетплейса."""
# Получить активные кампании
campaigns = await adapter.get_campaigns(status="active")
for campaign in campaigns:
try:
await self._process_campaign(mp_code, adapter, campaign, stats)
stats["campaigns_processed"] += 1
except Exception as e:
logger.error(f"Error processing campaign {campaign.external_id}: {e}")
stats["errors"] += 1
async def _process_campaign(self, mp_code: str, adapter, campaign, stats: dict):
"""Обработка кампании."""
# Получить статистику кампании
campaign_stats = await self._get_campaign_stats(adapter, campaign)
# Safety Logic на уровне кампании
camp_result = self.safety.check_campaign(campaign_stats)
if camp_result.action == SafetyAction.PAUSE_CAMPAIGN:
await adapter.pause_campaign(campaign.external_id)
await self._send_alert(
campaign.brand_id,
"campaign_paused",
camp_result.message,
camp_result.severity
)
stats["campaigns_paused"] += 1
stats["alerts_sent"] += 1
return
if camp_result.action == SafetyAction.ALERT:
await self._send_alert(
campaign.brand_id,
"budget_warning",
camp_result.message,
camp_result.severity
)
stats["alerts_sent"] += 1
# Обработка ключевых слов
keywords = await adapter.get_keywords(campaign.external_id)
for keyword in keywords:
try:
await self._process_keyword(
mp_code, adapter, campaign, keyword, stats
)
stats["keywords_processed"] += 1
except Exception as e:
logger.error(f"Error processing keyword {keyword.external_id}: {e}")
stats["errors"] += 1
async def _process_keyword(
self,
mp_code: str,
adapter,
campaign,
keyword,
stats: dict
):
"""Обработка ключевого слова."""
# Получить данные Watcher
watcher_data = await self.watcher.get_competitor_bids(
sku=keyword.sku if hasattr(keyword, 'sku') else None,
keyword=keyword.keyword,
marketplace=mp_code
)
# Получить конфигурацию стратегии
strategy_config = await self._get_strategy_config(campaign, keyword)
# Расчёт ставки
keyword_data = self._to_keyword_data(keyword)
competitor_data = self._to_competitor_data(watcher_data)
context = self._to_campaign_context(campaign)
bid_calc = self.bid_engine.calculate_bid(
keyword_data, strategy_config, competitor_data, context
)
# Safety Logic на уровне ключа
kw_stats = self._to_keyword_stats(keyword, campaign.external_id)
safety_result = self.safety.check_keyword(
kw_stats,
strategy_config.max_bid,
bid_calc.new_bid
)
# Применение результата
if safety_result.action == SafetyAction.PAUSE_KEYWORD:
await adapter.pause_keyword(keyword.external_id)
await self._log_bid_change(keyword, None, "paused", safety_result.reason)
stats["keywords_paused"] += 1
return
if safety_result.action == SafetyAction.CAP_BID:
bid_calc.new_bid = safety_result.capped_bid
await self._send_alert(
campaign.brand_id,
"bid_capped",
safety_result.message,
safety_result.severity
)
stats["alerts_sent"] += 1
# Применение ставки
if bid_calc.should_apply:
result = await adapter.update_bid(keyword.external_id, bid_calc.new_bid)
if result.success:
await self._log_bid_change(
keyword,
bid_calc.new_bid,
"updated",
bid_calc.reason
)
stats["bids_updated"] += 1
else:
logger.error(f"Failed to update bid: {result.error}")
stats["errors"] += 1