335 lines
12 KiB
Python

"""
Bridge between the n8n `Report: Build Payload` JSON and the existing
`create_docx_report()` entry point.
Keeping this isolated so the rest of the codebase never learns about n8n.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import pandas as pd
from src.config import config
_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items")
_LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = {
"lat": ["lat", "latitude"],
"lng": ["lng", "longitude", "lon", "long"],
"current": ["current", "peak_current", "peakCurrent", "amplitude", "amp"],
"p_type": ["p_type", "ptype", "type", "flash_type"],
"local_time": [
"local_time",
"localtime",
"captured",
"datetime",
"date_time",
"time",
"timestamp",
],
}
_TURBINE_COLUMN_ALIASES: dict[str, list[str]] = {
"lat": ["lat", "latitude"],
"lng": ["lng", "longitude", "lon", "long"],
"name": ["name", "turbine_name", "turbine_id"],
"unit_power_mwm": ["unit_power_mwm", "power_mwm"],
"unit_power_mwe": ["unit_power_mwe", "power_mwe"],
"tower_height_m": ["tower_height_m", "tower_height"],
"turbine_rotor_blade_diameter": [
"turbine_rotor_blade_diameter",
"rotor_diameter",
"rotor_blade_diameter",
],
"altitude": ["altitude", "elevation"],
}
def _apply_aliases(df: pd.DataFrame, aliases: dict[str, list[str]]) -> pd.DataFrame:
if df.empty:
return df
lower_to_actual = {str(c).lower(): c for c in df.columns}
rename_map: dict[str, str] = {}
for target, candidates in aliases.items():
if target in df.columns:
continue
for candidate in candidates:
src = lower_to_actual.get(candidate.lower())
if src is not None and src not in rename_map:
rename_map[src] = target
break
return df.rename(columns=rename_map) if rename_map else df
def _parse_local_time_series(series: pd.Series) -> pd.Series:
"""
Parse strike timestamps from ISO strings or epoch values.
n8n may add a numeric ``timestamp`` field (epoch ms). Pandas treats bare
integers as nanoseconds by default, which collapses ms epochs to ~1970.
"""
if series.empty:
return series
if not pd.api.types.is_numeric_dtype(series):
parsed = pd.to_datetime(series, errors="coerce", utc=True)
if parsed.notna().any():
return parsed
numeric = pd.to_numeric(series, errors="coerce")
if numeric.notna().any():
sample = float(numeric.dropna().iloc[0])
if sample >= 1e12:
return pd.to_datetime(numeric, unit="ms", errors="coerce", utc=True)
if sample >= 1e9:
return pd.to_datetime(numeric, unit="s", errors="coerce", utc=True)
return pd.to_datetime(series, errors="coerce", utc=True)
def _build_turbine_df(turbines: list[dict[str, Any]]) -> pd.DataFrame:
if not turbines:
return pd.DataFrame(columns=["name", "lat", "lng"])
df = pd.DataFrame(turbines)
df = _apply_aliases(df, _TURBINE_COLUMN_ALIASES)
missing = [c for c in ("lat", "lng") if c not in df.columns]
if missing:
raise ValueError(
f"Turbine payload missing required column(s) after normalization: {missing}"
)
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
df = df.dropna(subset=["lat", "lng"]).reset_index(drop=True)
if "name" not in df.columns:
df["name"] = [f"T{i + 1}" for i in range(len(df))]
for optional_col in (
"unit_power_mwm",
"unit_power_mwe",
"tower_height_m",
"turbine_rotor_blade_diameter",
"altitude",
):
if optional_col not in df.columns:
df[optional_col] = "N/A"
return df
def _build_lightning_df(
strikes: list[dict[str, Any]],
timezone_name: str | None,
) -> pd.DataFrame:
base_columns = ["lat", "lng", "current", "p_type", "local_time", "current_abs"]
if not strikes:
return pd.DataFrame(columns=base_columns)
df = pd.DataFrame(strikes)
df = _apply_aliases(df, _LIGHTNING_COLUMN_ALIASES)
missing = [c for c in ("lat", "lng", "current", "p_type", "local_time") if c not in df.columns]
if missing:
raise ValueError(
f"Lightning payload missing required column(s) after normalization: {missing}"
)
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
df["current"] = pd.to_numeric(df["current"], errors="coerce")
df["p_type"] = df["p_type"].astype(str)
local_time = _parse_local_time_series(df["local_time"])
if timezone_name:
try:
local_time = local_time.dt.tz_convert(timezone_name)
except Exception:
pass
df["local_time"] = local_time
df = df.dropna(subset=["lat", "lng", "local_time"]).reset_index(drop=True)
if "current_abs" not in df.columns:
df["current_abs"] = df["current"].abs()
return df
def _epoch_ms_to_local_str(epoch_ms: Any, timezone_name: str | None) -> str | None:
if epoch_ms in (None, "", 0):
return None
try:
ts = pd.to_datetime(int(epoch_ms), unit="ms", utc=True)
if timezone_name:
ts = ts.tz_convert(timezone_name)
return ts.strftime("%d-%m-%Y %H:%M")
except Exception:
return None
def _coerce_float(value: Any) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def apply_farm_config(payload: dict[str, Any]) -> None:
"""
Mutate the global `src.config.config` singleton per request so the rest of
the reporting pipeline reads farm-specific values from the n8n payload.
"""
rings_obj = payload.get("rings") or {}
ordered_rings = [int(rings_obj[k]) for k in ("r1", "r2", "r3", "r4", "r5") if k in rings_obj]
if ordered_rings:
config.distance_rings = ordered_rings
ring_colors = payload.get("ring_colors")
if ring_colors:
config.ring_colors = list(ring_colors)
config.wind_farm_name = payload.get("customer_name") or config.wind_farm_name or "Wind Farm"
config.timezone = payload.get("timezone") or config.timezone
# Trust the farm-wide centroid provided by the n8n workflow instead of
# recomputing it from turbine coordinates downstream.
centroid_lat = _coerce_float(payload.get("centroid_lat"))
centroid_lon = _coerce_float(
payload.get("centroid_lon")
if payload.get("centroid_lon") is not None
else payload.get("centroid_lng")
)
config.centroid_lat = centroid_lat
config.centroid_lon = centroid_lon
# n8n-supplied monitoring boundary is the authoritative outer analysis radius.
boundary_m = _coerce_float(payload.get("boundary_m"))
config.analysis_boundary_m = int(boundary_m) if boundary_m is not None else None
start_label = _epoch_ms_to_local_str(payload.get("t_start"), config.timezone)
end_label = _epoch_ms_to_local_str(payload.get("t_end"), config.timezone)
if start_label:
config.analysis_start_date = start_label
if end_label:
config.analysis_end_date = end_label
def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Return (turbine_df, lightning_df) ready for `create_docx_report()`."""
timezone_name = payload.get("timezone") or config.timezone
turbine_df = _build_turbine_df(payload.get("turbines") or [])
lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name)
return turbine_df, lightning_df
def _epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None:
if epoch_ms in (None, "", 0):
return None
try:
return (
datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%SZ")
)
except (TypeError, ValueError, OSError):
return None
def _exterior_to_wkt(exterior: Any) -> str | None:
"""Convert iklim.co `polygon.exterior` array into a WKT LINESTRING."""
if not isinstance(exterior, list) or not exterior:
return None
pairs: list[str] = []
for point in exterior:
if not isinstance(point, dict):
return None
lng = point.get("lng") if "lng" in point else point.get("longitude")
lat = point.get("lat") if "lat" in point else point.get("latitude")
if lng is None or lat is None:
return None
try:
pairs.append(f"{float(lng)} {float(lat)}")
except (TypeError, ValueError):
return None
if len(pairs) < 2:
return None
return f"LINESTRING({', '.join(pairs)})"
def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]:
"""
Accept any of these shapes coming from n8n and return a flat list of
storm-cell dicts:
* [ { "thunderstorms": [...] } ] (n8n HTTP item wrapper)
* { "thunderstorms": [...] } (single API response)
* [ {storm}, {storm}, ... ] (already flat)
* { "0": {storm}, "1": {storm} } (object-as-array)
"""
if raw is None:
return []
if isinstance(raw, list):
if raw and all(isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) for item in raw):
unwrapped: list[dict[str, Any]] = []
for item in raw:
for key in _STORM_ENVELOPE_KEYS:
if key in item and isinstance(item[key], list):
unwrapped.extend(s for s in item[key] if isinstance(s, dict))
break
return unwrapped
return [s for s in raw if isinstance(s, dict)]
if isinstance(raw, dict):
for key in _STORM_ENVELOPE_KEYS:
if key in raw and isinstance(raw[key], list):
return [s for s in raw[key] if isinstance(s, dict)]
return [v for v in raw.values() if isinstance(v, dict)]
return []
def normalize_storm_records(raw: Any) -> list[dict[str, Any]]:
"""
Translate iklim.co `/v1/thunderstorms/within` records into the shape that
`src.reporting.docx` and `src.visualization.storm_cells` expect.
Required output keys per record:
- cell_polygon_wkt
- effective_time / expire_time / creation_time (ISO 8601 UTC)
- lightning_severity
"""
out: list[dict[str, Any]] = []
for record in _unwrap_storm_envelope(raw):
cell = record.get("cell") if isinstance(record.get("cell"), dict) else {}
polygon = cell.get("polygon") if isinstance(cell.get("polygon"), dict) else {}
wkt = _exterior_to_wkt(polygon.get("exterior")) if polygon else None
if not wkt:
continue
effective = _epoch_ms_to_iso_utc(record.get("eventStartUtcEpoch"))
expire = _epoch_ms_to_iso_utc(record.get("eventEndUtcEpoch"))
created = _epoch_ms_to_iso_utc(record.get("insertedAtEpoch"))
normalized = dict(record)
normalized["cell_polygon_wkt"] = wkt
if effective:
normalized["effective_time"] = effective
if expire:
normalized["expire_time"] = expire
if created:
normalized["creation_time"] = created
severity = record.get("severity") or record.get("lightning_severity")
if severity is not None:
normalized["lightning_severity"] = str(severity).strip().lower()
out.append(normalized)
return out