235 lines
7.9 KiB
Python
235 lines
7.9 KiB
Python
"""
|
|
Bridge between the n8n `Report: Build Payload` JSON and the existing
|
|
`create_docx_report()` entry point.
|
|
|
|
Keeping this isolated so the rest of the codebase never learns about n8n.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
|
|
from src.config import config
|
|
from src.reporting.strings import normalize_language
|
|
from src.visualization.storm_cells import normalize_storm_records
|
|
|
|
|
|
_LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = {
|
|
"lat": ["lat", "latitude"],
|
|
"lng": ["lng", "longitude", "lon", "long"],
|
|
"current": ["current", "peak_current", "peakCurrent", "amplitude", "amp"],
|
|
"p_type": ["p_type", "ptype", "type", "flash_type"],
|
|
"local_time": [
|
|
"local_time",
|
|
"localtime",
|
|
"captured",
|
|
"datetime",
|
|
"date_time",
|
|
"time",
|
|
"timestamp",
|
|
],
|
|
"ic_height": ["ic_height", "inCloudHeight", "in_cloud_height", "InCloudHeight"],
|
|
}
|
|
|
|
_TURBINE_COLUMN_ALIASES: dict[str, list[str]] = {
|
|
"lat": ["lat", "latitude"],
|
|
"lng": ["lng", "longitude", "lon", "long"],
|
|
"name": ["name", "turbine_name", "turbine_id"],
|
|
"unit_power_mwm": ["unit_power_mwm", "power_mwm"],
|
|
"unit_power_mwe": ["unit_power_mwe", "power_mwe"],
|
|
"tower_height_m": ["tower_height_m", "tower_height"],
|
|
"turbine_rotor_blade_diameter": [
|
|
"turbine_rotor_blade_diameter",
|
|
"rotor_diameter",
|
|
"rotor_blade_diameter",
|
|
],
|
|
"altitude": ["altitude", "elevation"],
|
|
}
|
|
|
|
|
|
def _apply_aliases(df: pd.DataFrame, aliases: dict[str, list[str]]) -> pd.DataFrame:
|
|
if df.empty:
|
|
return df
|
|
lower_to_actual = {str(c).lower(): c for c in df.columns}
|
|
rename_map: dict[str, str] = {}
|
|
for target, candidates in aliases.items():
|
|
if target in df.columns:
|
|
continue
|
|
for candidate in candidates:
|
|
src = lower_to_actual.get(candidate.lower())
|
|
if src is not None and src not in rename_map:
|
|
rename_map[src] = target
|
|
break
|
|
return df.rename(columns=rename_map) if rename_map else df
|
|
|
|
|
|
def _parse_local_time_series(series: pd.Series) -> pd.Series:
|
|
"""
|
|
Parse strike timestamps from ISO strings or epoch values.
|
|
|
|
n8n may add a numeric ``timestamp`` field (epoch ms). Pandas treats bare
|
|
integers as nanoseconds by default, which collapses ms epochs to ~1970.
|
|
"""
|
|
if series.empty:
|
|
return series
|
|
|
|
if not pd.api.types.is_numeric_dtype(series):
|
|
parsed = pd.to_datetime(series, errors="coerce", utc=True)
|
|
if parsed.notna().any():
|
|
return parsed
|
|
|
|
numeric = pd.to_numeric(series, errors="coerce")
|
|
if numeric.notna().any():
|
|
sample = float(numeric.dropna().iloc[0])
|
|
if sample >= 1e12:
|
|
return pd.to_datetime(numeric, unit="ms", errors="coerce", utc=True)
|
|
if sample >= 1e9:
|
|
return pd.to_datetime(numeric, unit="s", errors="coerce", utc=True)
|
|
|
|
return pd.to_datetime(series, errors="coerce", utc=True)
|
|
|
|
|
|
def _build_turbine_df(turbines: list[dict[str, Any]]) -> pd.DataFrame:
|
|
if not turbines:
|
|
return pd.DataFrame(columns=["name", "lat", "lng"])
|
|
df = pd.DataFrame(turbines)
|
|
df = _apply_aliases(df, _TURBINE_COLUMN_ALIASES)
|
|
|
|
missing = [c for c in ("lat", "lng") if c not in df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Turbine payload missing required column(s) after normalization: {missing}"
|
|
)
|
|
|
|
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
|
|
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
|
|
df = df.dropna(subset=["lat", "lng"]).reset_index(drop=True)
|
|
|
|
if "name" not in df.columns:
|
|
df["name"] = [f"T{i + 1}" for i in range(len(df))]
|
|
|
|
for optional_col in (
|
|
"unit_power_mwm",
|
|
"unit_power_mwe",
|
|
"tower_height_m",
|
|
"turbine_rotor_blade_diameter",
|
|
"altitude",
|
|
):
|
|
if optional_col not in df.columns:
|
|
df[optional_col] = "N/A"
|
|
|
|
return df
|
|
|
|
|
|
def _build_lightning_df(
|
|
strikes: list[dict[str, Any]],
|
|
timezone_name: str | None,
|
|
) -> pd.DataFrame:
|
|
base_columns = ["lat", "lng", "current", "p_type", "local_time", "current_abs"]
|
|
if not strikes:
|
|
return pd.DataFrame(columns=base_columns)
|
|
|
|
df = pd.DataFrame(strikes)
|
|
df = _apply_aliases(df, _LIGHTNING_COLUMN_ALIASES)
|
|
|
|
missing = [c for c in ("lat", "lng", "current", "p_type", "local_time") if c not in df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Lightning payload missing required column(s) after normalization: {missing}"
|
|
)
|
|
|
|
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
|
|
df["lng"] = pd.to_numeric(df["lng"], errors="coerce")
|
|
df["current"] = pd.to_numeric(df["current"], errors="coerce")
|
|
df["p_type"] = df["p_type"].astype(str)
|
|
|
|
local_time = _parse_local_time_series(df["local_time"])
|
|
if timezone_name:
|
|
try:
|
|
local_time = local_time.dt.tz_convert(timezone_name)
|
|
except Exception:
|
|
pass
|
|
df["local_time"] = local_time
|
|
|
|
df = df.dropna(subset=["lat", "lng", "local_time"]).reset_index(drop=True)
|
|
|
|
if "current_abs" not in df.columns:
|
|
df["current_abs"] = df["current"].abs()
|
|
|
|
if "ic_height" in df.columns:
|
|
df["ic_height"] = pd.to_numeric(df["ic_height"], errors="coerce")
|
|
|
|
return df
|
|
|
|
|
|
def _epoch_ms_to_local_str(epoch_ms: Any, timezone_name: str | None) -> str | None:
|
|
if epoch_ms in (None, "", 0):
|
|
return None
|
|
try:
|
|
ts = pd.to_datetime(int(epoch_ms), unit="ms", utc=True)
|
|
if timezone_name:
|
|
ts = ts.tz_convert(timezone_name)
|
|
return ts.strftime("%d-%m-%Y %H:%M")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _coerce_float(value: Any) -> float | None:
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def apply_farm_config(payload: dict[str, Any]) -> None:
|
|
"""
|
|
Mutate the global `src.config.config` singleton per request so the rest of
|
|
the reporting pipeline reads farm-specific values from the n8n payload.
|
|
"""
|
|
rings_obj = payload.get("rings") or {}
|
|
ordered_rings = [int(rings_obj[k]) for k in ("r1", "r2", "r3", "r4", "r5") if k in rings_obj]
|
|
if ordered_rings:
|
|
config.distance_rings = ordered_rings
|
|
|
|
ring_colors = payload.get("ring_colors")
|
|
if ring_colors:
|
|
config.ring_colors = list(ring_colors)
|
|
|
|
config.wind_farm_name = payload.get("customer_name") or config.wind_farm_name or "Wind Farm"
|
|
config.timezone = payload.get("timezone") or config.timezone
|
|
|
|
# Trust the farm-wide centroid provided by the n8n workflow instead of
|
|
# recomputing it from turbine coordinates downstream.
|
|
centroid_lat = _coerce_float(payload.get("centroid_lat"))
|
|
centroid_lon = _coerce_float(
|
|
payload.get("centroid_lon")
|
|
if payload.get("centroid_lon") is not None
|
|
else payload.get("centroid_lng")
|
|
)
|
|
config.centroid_lat = centroid_lat
|
|
config.centroid_lon = centroid_lon
|
|
|
|
# n8n-supplied monitoring boundary is the authoritative outer analysis radius.
|
|
boundary_m = _coerce_float(payload.get("boundary_m"))
|
|
config.analysis_boundary_m = int(boundary_m) if boundary_m is not None else None
|
|
|
|
start_label = _epoch_ms_to_local_str(payload.get("t_start"), config.timezone)
|
|
end_label = _epoch_ms_to_local_str(payload.get("t_end"), config.timezone)
|
|
if start_label:
|
|
config.analysis_start_date = start_label
|
|
if end_label:
|
|
config.analysis_end_date = end_label
|
|
|
|
config.language = normalize_language(payload.get("language"))
|
|
|
|
|
|
def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
"""Return (turbine_df, lightning_df) ready for `create_docx_report()`."""
|
|
timezone_name = payload.get("timezone") or config.timezone
|
|
turbine_df = _build_turbine_df(payload.get("turbines") or [])
|
|
lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name)
|
|
return turbine_df, lightning_df
|