""" Bridge between the n8n `Report: Build Payload` JSON and the existing `create_docx_report()` entry point. Keeping this isolated so the rest of the codebase never learns about n8n. """ from __future__ import annotations from typing import Any import pandas as pd from src.config import config _LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = { "lat": ["lat", "latitude"], "lng": ["lng", "longitude", "lon", "long"], "current": ["current", "peak_current", "peakCurrent", "amplitude", "amp"], "p_type": ["p_type", "ptype", "type", "flash_type"], "local_time": [ "local_time", "localtime", "time", "timestamp", "captured", "datetime", "date_time", ], } _TURBINE_COLUMN_ALIASES: dict[str, list[str]] = { "lat": ["lat", "latitude"], "lng": ["lng", "longitude", "lon", "long"], "name": ["name", "turbine_name", "turbine_id"], "unit_power_mwm": ["unit_power_mwm", "power_mwm"], "unit_power_mwe": ["unit_power_mwe", "power_mwe"], "tower_height_m": ["tower_height_m", "tower_height"], "turbine_rotor_blade_diameter": [ "turbine_rotor_blade_diameter", "rotor_diameter", "rotor_blade_diameter", ], "altitude": ["altitude", "elevation"], } def _apply_aliases(df: pd.DataFrame, aliases: dict[str, list[str]]) -> pd.DataFrame: if df.empty: return df lower_to_actual = {str(c).lower(): c for c in df.columns} rename_map: dict[str, str] = {} for target, candidates in aliases.items(): if target in df.columns: continue for candidate in candidates: src = lower_to_actual.get(candidate.lower()) if src is not None and src not in rename_map: rename_map[src] = target break return df.rename(columns=rename_map) if rename_map else df def _build_turbine_df(turbines: list[dict[str, Any]]) -> pd.DataFrame: if not turbines: return pd.DataFrame(columns=["name", "lat", "lng"]) df = pd.DataFrame(turbines) df = _apply_aliases(df, _TURBINE_COLUMN_ALIASES) missing = [c for c in ("lat", "lng") if c not in df.columns] if missing: raise ValueError( f"Turbine payload missing required column(s) after normalization: {missing}" ) df["lat"] = pd.to_numeric(df["lat"], errors="coerce") df["lng"] = pd.to_numeric(df["lng"], errors="coerce") df = df.dropna(subset=["lat", "lng"]).reset_index(drop=True) if "name" not in df.columns: df["name"] = [f"T{i + 1}" for i in range(len(df))] for optional_col in ( "unit_power_mwm", "unit_power_mwe", "tower_height_m", "turbine_rotor_blade_diameter", "altitude", ): if optional_col not in df.columns: df[optional_col] = "N/A" return df def _build_lightning_df( strikes: list[dict[str, Any]], timezone_name: str | None, ) -> pd.DataFrame: base_columns = ["lat", "lng", "current", "p_type", "local_time", "current_abs"] if not strikes: return pd.DataFrame(columns=base_columns) df = pd.DataFrame(strikes) df = _apply_aliases(df, _LIGHTNING_COLUMN_ALIASES) missing = [c for c in ("lat", "lng", "current", "p_type", "local_time") if c not in df.columns] if missing: raise ValueError( f"Lightning payload missing required column(s) after normalization: {missing}" ) df["lat"] = pd.to_numeric(df["lat"], errors="coerce") df["lng"] = pd.to_numeric(df["lng"], errors="coerce") df["current"] = pd.to_numeric(df["current"], errors="coerce") df["p_type"] = df["p_type"].astype(str) local_time = pd.to_datetime(df["local_time"], errors="coerce", utc=True) if timezone_name: try: local_time = local_time.dt.tz_convert(timezone_name) except Exception: pass df["local_time"] = local_time df = df.dropna(subset=["lat", "lng", "local_time"]).reset_index(drop=True) if "current_abs" not in df.columns: df["current_abs"] = df["current"].abs() return df def _epoch_ms_to_local_str(epoch_ms: Any, timezone_name: str | None) -> str | None: if epoch_ms in (None, "", 0): return None try: ts = pd.to_datetime(int(epoch_ms), unit="ms", utc=True) if timezone_name: ts = ts.tz_convert(timezone_name) return ts.strftime("%d-%m-%Y %H:%M") except Exception: return None def _coerce_float(value: Any) -> float | None: if value is None or value == "": return None try: return float(value) except (TypeError, ValueError): return None def apply_farm_config(payload: dict[str, Any]) -> None: """ Mutate the global `src.config.config` singleton per request so the rest of the reporting pipeline reads farm-specific values from the n8n payload. """ rings_obj = payload.get("rings") or {} ordered_rings = [int(rings_obj[k]) for k in ("r1", "r2", "r3", "r4", "r5") if k in rings_obj] if ordered_rings: config.distance_rings = ordered_rings ring_colors = payload.get("ring_colors") if ring_colors: config.ring_colors = list(ring_colors) config.wind_farm_name = payload.get("customer_name") or config.wind_farm_name or "Wind Farm" config.timezone = payload.get("timezone") or config.timezone # Trust the farm-wide centroid provided by the n8n workflow instead of # recomputing it from turbine coordinates downstream. centroid_lat = _coerce_float(payload.get("centroid_lat")) centroid_lon = _coerce_float( payload.get("centroid_lon") if payload.get("centroid_lon") is not None else payload.get("centroid_lng") ) config.centroid_lat = centroid_lat config.centroid_lon = centroid_lon # n8n-supplied monitoring boundary is the authoritative outer analysis radius. boundary_m = _coerce_float(payload.get("boundary_m")) config.analysis_boundary_m = int(boundary_m) if boundary_m is not None else None start_label = _epoch_ms_to_local_str(payload.get("t_start"), config.timezone) end_label = _epoch_ms_to_local_str(payload.get("t_end"), config.timezone) if start_label: config.analysis_start_date = start_label if end_label: config.analysis_end_date = end_label def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFrame]: """Return (turbine_df, lightning_df) ready for `create_docx_report()`.""" timezone_name = payload.get("timezone") or config.timezone turbine_df = _build_turbine_df(payload.get("turbines") or []) lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name) return turbine_df, lightning_df