# app/utils/office_reporter.py
from datetime import datetime
from typing import Optional, Dict, Any
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from app.core.database import get_db_session
class OfficeReporter:
"""
Утилита для отправки статуса агента в Office Dashboard.
Использование:
reporter = OfficeReporter(
agent_id="watcher_price_monitor",
department="watcher",
name="Мониторинг цен",
salary_equivalent=60000,
fte_coefficient=1.0
)
reporter.report_working("Сканирование цен", {"products_scanned": 100})
reporter.report_idle()
reporter.report_error("API timeout")
reporter.heartbeat()
"""
def __init__(
self,
agent_id: str,
department: str,
name: str,
brand: Optional[str] = None,
salary_equivalent: int = 60000,
fte_coefficient: float = 1.0
):
self.agent_id = agent_id
self.department = department
self.name = name
self.brand = brand
self.salary_equivalent = salary_equivalent
self.fte_coefficient = fte_coefficient
def _upsert(
self,
db: Session,
status: str,
task: Optional[str] = None,
metrics: Optional[Dict[str, Any]] = None
):
"""UPSERT статуса в office_agent_status."""
stmt = insert(OfficeAgentStatus).values(
agent_id=self.agent_id,
department=self.department,
name=self.name,
brand=self.brand,
status=status,
task=task,
metrics=metrics or {},
salary_equivalent=self.salary_equivalent,
fte_coefficient=self.fte_coefficient,
last_activity=datetime.utcnow(),
updated_at=datetime.utcnow()
).on_conflict_do_update(
index_elements=['agent_id'],
set_={
'status': status,
'task': task,
'metrics': metrics or {},
'last_activity': datetime.utcnow(),
'updated_at': datetime.utcnow()
}
)
db.execute(stmt)
db.commit()
def report_working(
self,
task: str,
metrics: Optional[Dict[str, Any]] = None,
db: Optional[Session] = None
):
"""Статус: работает над задачей."""
if db is None:
with get_db_session() as db:
self._upsert(db, status="ok", task=task, metrics=metrics)
else:
self._upsert(db, status="ok", task=task, metrics=metrics)
def report_idle(
self,
metrics: Optional[Dict[str, Any]] = None,
db: Optional[Session] = None
):
"""Статус: ожидает задачу."""
if db is None:
with get_db_session() as db:
self._upsert(db, status="ok", task=None, metrics=metrics)
else:
self._upsert(db, status="ok", task=None, metrics=metrics)
def report_warning(
self,
task: str,
metrics: Optional[Dict[str, Any]] = None,
db: Optional[Session] = None
):
"""Статус: требует внимания."""
if db is None:
with get_db_session() as db:
self._upsert(db, status="warning", task=task, metrics=metrics)
else:
self._upsert(db, status="warning", task=task, metrics=metrics)
def report_error(
self,
error_message: str,
metrics: Optional[Dict[str, Any]] = None,
db: Optional[Session] = None
):
"""Статус: ошибка."""
if db is None:
with get_db_session() as db:
self._upsert(db, status="error", task=f"Ошибка: {error_message}", metrics=metrics)
else:
self._upsert(db, status="error", task=f"Ошибка: {error_message}", metrics=metrics)
def report_offline(self, db: Optional[Session] = None):
"""Статус: не в сети."""
if db is None:
with get_db_session() as db:
self._upsert(db, status="offline", task=None, metrics={})
else:
self._upsert(db, status="offline", task=None, metrics={})
def heartbeat(self, db: Optional[Session] = None):
"""Обновление last_activity без изменения статуса."""
if db is None:
with get_db_session() as db:
db.execute(
"UPDATE office_agent_status SET last_activity = NOW(), updated_at = NOW() WHERE agent_id = :agent_id",
{"agent_id": self.agent_id}
)
db.commit()
else:
db.execute(
"UPDATE office_agent_status SET last_activity = NOW(), updated_at = NOW() WHERE agent_id = :agent_id",
{"agent_id": self.agent_id}
)
db.commit()