338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""
|
|
Bridge between the n8n `Report: Build Payload` JSON and the existing
|
|
`create_docx_report()` entry point.
|
|
|
|
Keeping this isolated so the rest of the codebase never learns about n8n.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
|
|
from src.config import config
|
|
from src.reporting.strings import normalize_language
|
|
|
|
_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items")
|
|
|
|
|
|
_LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = {
|
|
"lat": ["lat", "latitude"],
|
|
"lng": ["lng", "longitude", "lon", "long"],
|
|
"current": ["current", "peak_current", "peakCurrent", "amplitude", "amp"],
|
|
"p_type": ["p_type", "ptype", "type", "flash_type"],
|
|
"local_time": [
|
|
"local_time",
|
|
"localtime",
|
|
"captured",
|
|
"datetime",
|
|
"date_time",
|
|
"time",
|
|
"timestamp",
|
|
],
|
|
}
|
|
|
|
_TURBINE_COLUMN_ALIASES: dict[str, list[str]] = {
|
|
"lat": ["lat", "latitude"],
|
|
"lng": ["lng", "longitude", "lon", "long"],
|
|
"name": ["name", "turbine_name", "turbine_id"],
|
|
"unit_power_mwm": ["unit_power_mwm", "power_mwm"],
|
|
"unit_power_mwe": ["unit_power_mwe", "power_mwe"],
|
|
"tower_height_m": ["tower_height_m", "tower_height"],
|
|
"turbine_rotor_blade_diameter": [
|
|
"turbine_rotor_blade_diameter",
|
|
"rotor_diameter",
|
|
"rotor_blade_diameter",
|
|
],
|
|
"altitude": ["altitude", "elevation"],
|
|
}
|
|
|
|
|
|
def _apply_aliases(df: pd.DataFrame, aliases: dict[str, list[str]]) -> pd.DataFrame:
|
|
if df.empty:
|
|
return df
|
|
lower_to_actual = {str(c).lower(): c for c in df.columns}
|
|
rename_map: dict[str, str] = {}
|
|
for target, candidates in aliases.items():
|
|
if target in df.columns:
|
|
continue
|
|
for candidate in candidates:
|
|
src = lower_to_actual.get(candidate.lower())
|
|
if src is not None and src not in rename_map:
|
|
rename_map[src] = target
|
|
break
|
|
return df.rename(columns=rename_map) if rename_map else df
|
|
|
|
|
|
def _parse_local_time_series(series: pd.Series) -> pd.Series:
|
|
"""
|
|
Parse strike timestamps from ISO strings or epoch values.
|
|
|
|
n8n may add a numeric ``timestamp`` field (epoch ms). Pandas treats bare
|
|
integers as nanoseconds by default, which collapses ms epochs to ~1970.
|
|
"""
|
|
if series.empty:
|
|
return series
|
|
|
|
if not pd.api.types.is_numeric_dtype(series):
|
|
parsed = pd.to_datetime(series, errors="coerce", utc=True)
|
|
if parsed.notna().any():
|
|
return parsed
|
|
|
|
numeric = pd.to_numeric(series, errors="coerce")
|
|
if numeric.notna().any():
|
|
sample = float(numeric.dropna().iloc[0])
|
|
if sample >= 1e12:
|
|
return pd.to_datetime(numeric, unit="ms", errors="coerce", utc=True)
|
|
if sample >= 1e9:
|
|
return pd.to_datetime(numeric, unit="s", errors="coerce", utc=True)
|
|
|
|
return pd.to_datetime(series, errors="coerce", utc=True)
|
|
|
|
|
|
def _build_turbine_df(turbines: list[dict[str, Any]]) -> pd.DataFrame:
|
|
if not turbines:
|
|
return pd.DataFrame(columns=["name", "lat", "lng"])
|
|
df = pd.DataFrame(turbines)
|
|
df = _apply_aliases(df, _TURBINE_COLUMN_ALIASES)
|
|
|
|
missing = [c for c in ("lat", "lng") if c not in df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Turbine payload missing required column(s) after normalization: {missing}"
|
|
)
|
|
|
|
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
|
|
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
|
|
df = df.dropna(subset=["lat", "lng"]).reset_index(drop=True)
|
|
|
|
if "name" not in df.columns:
|
|
df["name"] = [f"T{i + 1}" for i in range(len(df))]
|
|
|
|
for optional_col in (
|
|
"unit_power_mwm",
|
|
"unit_power_mwe",
|
|
"tower_height_m",
|
|
"turbine_rotor_blade_diameter",
|
|
"altitude",
|
|
):
|
|
if optional_col not in df.columns:
|
|
df[optional_col] = "N/A"
|
|
|
|
return df
|
|
|
|
|
|
def _build_lightning_df(
|
|
strikes: list[dict[str, Any]],
|
|
timezone_name: str | None,
|
|
) -> pd.DataFrame:
|
|
base_columns = ["lat", "lng", "current", "p_type", "local_time", "current_abs"]
|
|
if not strikes:
|
|
return pd.DataFrame(columns=base_columns)
|
|
|
|
df = pd.DataFrame(strikes)
|
|
df = _apply_aliases(df, _LIGHTNING_COLUMN_ALIASES)
|
|
|
|
missing = [c for c in ("lat", "lng", "current", "p_type", "local_time") if c not in df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Lightning payload missing required column(s) after normalization: {missing}"
|
|
)
|
|
|
|
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
|
|
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
|
|
df["current"] = pd.to_numeric(df["current"], errors="coerce")
|
|
df["p_type"] = df["p_type"].astype(str)
|
|
|
|
local_time = _parse_local_time_series(df["local_time"])
|
|
if timezone_name:
|
|
try:
|
|
local_time = local_time.dt.tz_convert(timezone_name)
|
|
except Exception:
|
|
pass
|
|
df["local_time"] = local_time
|
|
|
|
df = df.dropna(subset=["lat", "lng", "local_time"]).reset_index(drop=True)
|
|
|
|
if "current_abs" not in df.columns:
|
|
df["current_abs"] = df["current"].abs()
|
|
|
|
return df
|
|
|
|
|
|
def _epoch_ms_to_local_str(epoch_ms: Any, timezone_name: str | None) -> str | None:
|
|
if epoch_ms in (None, "", 0):
|
|
return None
|
|
try:
|
|
ts = pd.to_datetime(int(epoch_ms), unit="ms", utc=True)
|
|
if timezone_name:
|
|
ts = ts.tz_convert(timezone_name)
|
|
return ts.strftime("%d-%m-%Y %H:%M")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _coerce_float(value: Any) -> float | None:
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def apply_farm_config(payload: dict[str, Any]) -> None:
|
|
"""
|
|
Mutate the global `src.config.config` singleton per request so the rest of
|
|
the reporting pipeline reads farm-specific values from the n8n payload.
|
|
"""
|
|
rings_obj = payload.get("rings") or {}
|
|
ordered_rings = [int(rings_obj[k]) for k in ("r1", "r2", "r3", "r4", "r5") if k in rings_obj]
|
|
if ordered_rings:
|
|
config.distance_rings = ordered_rings
|
|
|
|
ring_colors = payload.get("ring_colors")
|
|
if ring_colors:
|
|
config.ring_colors = list(ring_colors)
|
|
|
|
config.wind_farm_name = payload.get("customer_name") or config.wind_farm_name or "Wind Farm"
|
|
config.timezone = payload.get("timezone") or config.timezone
|
|
|
|
# Trust the farm-wide centroid provided by the n8n workflow instead of
|
|
# recomputing it from turbine coordinates downstream.
|
|
centroid_lat = _coerce_float(payload.get("centroid_lat"))
|
|
centroid_lon = _coerce_float(
|
|
payload.get("centroid_lon")
|
|
if payload.get("centroid_lon") is not None
|
|
else payload.get("centroid_lng")
|
|
)
|
|
config.centroid_lat = centroid_lat
|
|
config.centroid_lon = centroid_lon
|
|
|
|
# n8n-supplied monitoring boundary is the authoritative outer analysis radius.
|
|
boundary_m = _coerce_float(payload.get("boundary_m"))
|
|
config.analysis_boundary_m = int(boundary_m) if boundary_m is not None else None
|
|
|
|
start_label = _epoch_ms_to_local_str(payload.get("t_start"), config.timezone)
|
|
end_label = _epoch_ms_to_local_str(payload.get("t_end"), config.timezone)
|
|
if start_label:
|
|
config.analysis_start_date = start_label
|
|
if end_label:
|
|
config.analysis_end_date = end_label
|
|
|
|
config.language = normalize_language(payload.get("language"))
|
|
|
|
|
|
def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
"""Return (turbine_df, lightning_df) ready for `create_docx_report()`."""
|
|
timezone_name = payload.get("timezone") or config.timezone
|
|
turbine_df = _build_turbine_df(payload.get("turbines") or [])
|
|
lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name)
|
|
return turbine_df, lightning_df
|
|
|
|
|
|
def _epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None:
|
|
if epoch_ms in (None, "", 0):
|
|
return None
|
|
try:
|
|
return (
|
|
datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc)
|
|
.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
)
|
|
except (TypeError, ValueError, OSError):
|
|
return None
|
|
|
|
|
|
def _exterior_to_wkt(exterior: Any) -> str | None:
|
|
"""Convert iklim.co `polygon.exterior` array into a WKT LINESTRING."""
|
|
if not isinstance(exterior, list) or not exterior:
|
|
return None
|
|
pairs: list[str] = []
|
|
for point in exterior:
|
|
if not isinstance(point, dict):
|
|
return None
|
|
lng = point.get("lng") if "lng" in point else point.get("longitude")
|
|
lat = point.get("lat") if "lat" in point else point.get("latitude")
|
|
if lng is None or lat is None:
|
|
return None
|
|
try:
|
|
pairs.append(f"{float(lng)} {float(lat)}")
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if len(pairs) < 2:
|
|
return None
|
|
return f"LINESTRING({', '.join(pairs)})"
|
|
|
|
|
|
def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]:
|
|
"""
|
|
Accept any of these shapes coming from n8n and return a flat list of
|
|
storm-cell dicts:
|
|
|
|
* [ { "thunderstorms": [...] } ] (n8n HTTP item wrapper)
|
|
* { "thunderstorms": [...] } (single API response)
|
|
* [ {storm}, {storm}, ... ] (already flat)
|
|
* { "0": {storm}, "1": {storm} } (object-as-array)
|
|
"""
|
|
if raw is None:
|
|
return []
|
|
|
|
if isinstance(raw, list):
|
|
if raw and all(isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) for item in raw):
|
|
unwrapped: list[dict[str, Any]] = []
|
|
for item in raw:
|
|
for key in _STORM_ENVELOPE_KEYS:
|
|
if key in item and isinstance(item[key], list):
|
|
unwrapped.extend(s for s in item[key] if isinstance(s, dict))
|
|
break
|
|
return unwrapped
|
|
return [s for s in raw if isinstance(s, dict)]
|
|
|
|
if isinstance(raw, dict):
|
|
for key in _STORM_ENVELOPE_KEYS:
|
|
if key in raw and isinstance(raw[key], list):
|
|
return [s for s in raw[key] if isinstance(s, dict)]
|
|
return [v for v in raw.values() if isinstance(v, dict)]
|
|
|
|
return []
|
|
|
|
|
|
def normalize_storm_records(raw: Any) -> list[dict[str, Any]]:
|
|
"""
|
|
Translate iklim.co `/v1/thunderstorms/within` records into the shape that
|
|
`src.reporting.docx` and `src.visualization.storm_cells` expect.
|
|
|
|
Required output keys per record:
|
|
- cell_polygon_wkt
|
|
- effective_time / expire_time / creation_time (ISO 8601 UTC)
|
|
- lightning_severity
|
|
"""
|
|
out: list[dict[str, Any]] = []
|
|
for record in _unwrap_storm_envelope(raw):
|
|
cell = record.get("cell") if isinstance(record.get("cell"), dict) else {}
|
|
polygon = cell.get("polygon") if isinstance(cell.get("polygon"), dict) else {}
|
|
wkt = _exterior_to_wkt(polygon.get("exterior")) if polygon else None
|
|
if not wkt:
|
|
continue
|
|
|
|
effective = _epoch_ms_to_iso_utc(record.get("eventStartUtcEpoch"))
|
|
expire = _epoch_ms_to_iso_utc(record.get("eventEndUtcEpoch"))
|
|
created = _epoch_ms_to_iso_utc(record.get("insertedAtEpoch"))
|
|
|
|
normalized = dict(record)
|
|
normalized["cell_polygon_wkt"] = wkt
|
|
if effective:
|
|
normalized["effective_time"] = effective
|
|
if expire:
|
|
normalized["expire_time"] = expire
|
|
if created:
|
|
normalized["creation_time"] = created
|
|
|
|
severity = record.get("severity") or record.get("lightning_severity")
|
|
if severity is not None:
|
|
normalized["lightning_severity"] = str(severity).strip().lower()
|
|
|
|
out.append(normalized)
|
|
return out
|