from __future__ import annotations
from typing import Any, Callable
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from src.config import config
from src.reporting.precompute import precompute_distances_and_rings
from src.reporting.strings import ReportLanguage, get_report_language, get_strings
def _parse_epoch_to_farm_local(numeric: pd.Series, tz_name: str | None) -> pd.Series:
numeric_valid = numeric.dropna()
if len(numeric_valid) == 0:
return pd.to_datetime(numeric, errors="coerce")
abs_max = float(numeric_valid.abs().max())
if abs_max >= 1e17:
unit = "ns"
elif abs_max >= 1e14:
unit = "us"
elif abs_max >= 1e11:
unit = "ms"
else:
unit = "s"
parsed = pd.to_datetime(numeric, errors="coerce", unit=unit, utc=True)
if getattr(parsed.dt, "tz", None) is not None and tz_name:
parsed = parsed.dt.tz_convert(tz_name)
return parsed.dt.tz_localize(None) if getattr(parsed.dt, "tz", None) is not None else parsed
def _farm_local_naive_times(values: pd.Series) -> pd.Series:
"""Normalize strike timestamps to naive farm-local wall clock."""
tz_name = config.timezone
if pd.api.types.is_datetime64_any_dtype(values):
parsed = pd.to_datetime(values, errors="coerce")
elif pd.api.types.is_numeric_dtype(values):
parsed = _parse_epoch_to_farm_local(values, tz_name)
else:
numeric = pd.to_numeric(values, errors="coerce")
numeric_valid = numeric.dropna()
if len(numeric_valid) > 0 and len(numeric_valid) >= max(1, int(len(values) * 0.7)):
parsed = _parse_epoch_to_farm_local(numeric, tz_name)
else:
parsed = pd.to_datetime(values, errors="coerce")
if getattr(parsed.dt, "tz", None) is None:
return parsed
if tz_name:
parsed = parsed.dt.tz_convert(tz_name)
return parsed.dt.tz_localize(None)
def _chart_xaxis_range(time_values: np.ndarray) -> tuple[pd.Timestamp, pd.Timestamp]:
start_label = getattr(config, "analysis_start_date", None)
end_label = getattr(config, "analysis_end_date", None)
if start_label and end_label:
try:
return (
pd.to_datetime(str(start_label).strip(), format="%d-%m-%Y %H:%M"),
pd.to_datetime(str(end_label).strip(), format="%d-%m-%Y %H:%M"),
)
except ValueError:
pass
times = pd.to_datetime(time_values)
x_min = pd.Timestamp(times.min())
x_max = pd.Timestamp(times.max())
span = x_max - x_min
padding = max(pd.Timedelta(minutes=1), span * 0.1)
if span == pd.Timedelta(0):
padding = pd.Timedelta(minutes=2)
return x_min - padding, x_max + padding
def _chart_xaxis_tick_config(
x_min: pd.Timestamp,
x_max: pd.Timestamp,
max_ticks: int = 6,
) -> tuple[int, str]:
span_minutes = max(1.0, (x_max - x_min).total_seconds() / 60.0)
nice_intervals_min = (1, 2, 5, 10, 15, 30, 60, 120, 180, 240, 360, 720, 1440)
dtick_min = nice_intervals_min[-1]
for interval in nice_intervals_min:
if span_minutes / interval <= max_ticks:
dtick_min = interval
break
tickformat = "%H:%M" if x_min.date() == x_max.date() else "%d-%m-%Y %H:%M"
return dtick_min * 60 * 1000, tickformat
def _build_current_vs_distance_chart(
lightning_df: pd.DataFrame,
dists_km: np.ndarray,
mask_within: np.ndarray,
lightning_type_filter: str,
title: str,
fig_width: int,
fig_height: int,
lang: ReportLanguage | None = None,
) -> go.Figure | None:
s = get_strings(lang or get_report_language())
if lightning_type_filter == "cg":
type_mask = lightning_df["p_type"].astype(str) == "0"
else:
type_mask = lightning_df["p_type"].astype(str) != "0"
combined_mask = mask_within & type_mask
if combined_mask.sum() == 0:
return None
subset = lightning_df.loc[combined_mask].copy()
distances = dists_km[combined_mask]
currents = subset["current"].values.astype(float)
display_times = _farm_local_naive_times(subset["local_time"])
valid_mask = ~display_times.isna()
if valid_mask.sum() == 0:
return None
display_times = display_times.loc[valid_mask]
distances = distances[valid_mask.values]
currents = currents[valid_mask.values]
sort_idx = np.argsort(display_times.values.astype("datetime64[ns]"))
time_values = display_times.values[sort_idx]
distances = distances[sort_idx]
currents = currents[sort_idx]
rings_km = np.array(config.distance_rings, dtype=float) / 1000.0
ring_colors_cfg = getattr(config, "ring_colors", None) or []
ring_indices = np.searchsorted(rings_km, distances, side="left").astype(int)
ring_indices = np.clip(ring_indices, 0, len(rings_km) - 1)
ring_names: list[str] = []
for i in range(len(rings_km)):
if i == 0:
ring_names.append(f"0-{rings_km[0]:.1f} km")
else:
ring_names.append(f"{rings_km[i - 1]:.1f}-{rings_km[i]:.1f} km")
fig = go.Figure()
for i in range(len(rings_km)):
mask_ring = ring_indices == i
if mask_ring.sum() == 0:
continue
color = ring_colors_cfg[i] if i < len(ring_colors_cfg) else "gray"
r_times = time_values[mask_ring]
r_currents = currents[mask_ring]
r_dists = distances[mask_ring]
tz_suffix = f" ({config.timezone})" if config.timezone else ""
r_time_labels = pd.to_datetime(r_times).strftime("%d-%m-%Y %H:%M").values
r_time_labels = np.array([f"{t}{tz_suffix}" for t in r_time_labels])
fig.add_trace(
go.Scatter(
x=r_times,
y=r_currents,
mode="markers",
name=ring_names[i],
marker=dict(size=10, opacity=0.8, color=color),
customdata=np.column_stack((r_dists, r_time_labels)),
hovertemplate=(
f"{s.chart_time}: %{{customdata[1]}}
"
f"{s.chart_current}: %{{y:.0f}} A
"
f"{s.chart_distance}: %{{customdata[0]}} km
"
f"{s.chart_ring}: " + ring_names[i] + "
"
""
),
)
)
timezone_label = config.timezone or "UTC"
x_min, x_max = _chart_xaxis_range(time_values)
dtick, tickformat = _chart_xaxis_tick_config(x_min, x_max)
fig.update_layout(
font=dict(size=16),
title=dict(text=title, x=0.5, font=dict(size=22)),
xaxis_title=f"{s.chart_time} ({timezone_label})",
yaxis_title=s.chart_current_axis,
plot_bgcolor="white",
paper_bgcolor="white",
xaxis=dict(
type="date",
showgrid=True,
gridcolor="lightgray",
zeroline=False,
range=[x_min, x_max],
tickformat=tickformat,
dtick=dtick,
tickangle=-25,
tickfont=dict(size=22),
title_font=dict(size=28),
),
yaxis=dict(
showgrid=True,
gridcolor="lightgray",
zeroline=False,
tickfont=dict(size=22),
title_font=dict(size=28),
),
legend=dict(
title=s.chart_distance_ring,
orientation="h",
x=0.5,
xanchor="center",
y=-0.28,
yanchor="top",
bgcolor="rgba(255,255,255,0.8)",
bordercolor="black",
borderwidth=1,
font=dict(size=20),
title_font=dict(size=24),
),
width=fig_width,
height=fig_height,
margin=dict(l=70, r=40, t=50, b=130),
)
return fig
def _turbine_cell_value_is_available(value: Any) -> bool:
if value is None:
return False
try:
if pd.isna(value):
return False
except (TypeError, ValueError):
pass
text = str(value).strip()
if not text:
return False
if text.lower() in {"n/a", "na", "nan", "none", "-", "--"}:
return False
return True
def build_turbine_information_table_data(
turbine_df: pd.DataFrame,
lang: ReportLanguage | None = None,
) -> list[list[str]]:
s = get_strings(lang or get_report_language())
column_specs: list[tuple[str, str, Callable[[pd.Series], str], bool]] = [
(s.col_turbine, "name", lambda t: str(t.get("name", "N/A")), True),
(s.col_lat, "lat", lambda t: f"{t.get('lat', 0):.4f}", True),
(s.col_lng, "lng", lambda t: f"{t.get('lng', 0):.4f}", True),
(s.col_unit_power_mwm, "unit_power_mwm", lambda t: str(t.get("unit_power_mwm", "N/A")), False),
(s.col_unit_power_mwe, "unit_power_mwe", lambda t: str(t.get("unit_power_mwe", "N/A")), False),
(s.col_tower_height, "tower_height_m", lambda t: str(t.get("tower_height_m", "N/A")), False),
(
s.col_rotor_diameter,
"turbine_rotor_blade_diameter",
lambda t: str(t.get("turbine_rotor_blade_diameter", "N/A")),
False,
),
(s.col_altitude, "altitude", lambda t: str(t.get("altitude", "N/A")), False),
]
active_columns: list[tuple[str, Callable[[pd.Series], str]]] = []
for header, field, formatter, always_include in column_specs:
if always_include:
active_columns.append((header, formatter))
continue
if any(_turbine_cell_value_is_available(row.get(field)) for _, row in turbine_df.iterrows()):
active_columns.append((header, formatter))
rows: list[list[str]] = [[header for header, _ in active_columns]]
for _, turbine in turbine_df.iterrows():
rows.append([formatter(turbine) for _, formatter in active_columns])
return rows
def build_lightning_event_table_data(
centroid_lat: float,
centroid_lng: float,
lightning_df: pd.DataFrame,
lang: ReportLanguage | None = None,
) -> tuple[list[list[str]], list[str]]:
s = get_strings(lang or get_report_language())
pre = precompute_distances_and_rings(
centroid_lat, centroid_lng, lightning_df, config.distance_rings
)
rows: list[list[str]] = []
row_colors: list[str] = []
outermost_km = max(config.distance_rings) / 1000.0
rings_km = [r / 1000.0 for r in config.distance_rings]
for i, rec in enumerate(lightning_df.itertuples(index=False)):
proximity = float(pre["dists_km"][i])
if proximity > outermost_km:
continue
ri = int(pre["ring_idx"][i])
if ri >= len(rings_km):
continue
color = config.ring_colors[ri]
try:
from src.utils import format_datetime_ddmmyyyy_hhmmss
dt_val = (
rec.local_time
if not isinstance(rec.local_time, str)
else pd.to_datetime(str(rec.local_time)[:19])
)
local_time = format_datetime_ddmmyyyy_hhmmss(pd.to_datetime(dt_val))
except Exception:
local_time = str(getattr(rec, "local_time", ""))[:19]
lightning_type = s.lightning_type_cg if str(rec.p_type) == "0" else s.lightning_type_ic
height_val = getattr(rec, "ic_height", "")
if height_val == "":
height_val = getattr(rec, "height", "")
rows.append(
[
"",
local_time,
f"{rec.lat:.5f}",
f"{rec.lng:.5f}",
str(rec.current),
str(height_val),
lightning_type,
f"{proximity:.2f}",
]
)
row_colors.append(color)
sorted_data = sorted(
zip(rows, row_colors),
key=lambda x: (0 if x[0][6] == s.lightning_type_cg else 1, float(x[0][7])),
)
if sorted_data:
rows, row_colors = zip(*sorted_data)
rows = list(rows)
row_colors = list(row_colors)
for idx, row in enumerate(rows):
row[0] = str(idx + 1)
else:
rows, row_colors = [], []
header = [
s.col_no,
s.col_time_local,
s.col_lat,
s.col_lng,
s.col_current_amps,
s.col_height_m,
s.col_lightning_type,
s.col_proximity_km,
]
return [header] + rows, ["lightgrey"] + row_colors
def build_risk_table_data(
turbine_df: pd.DataFrame,
lang: ReportLanguage | None = None,
) -> tuple[list[list[str]] | None, list[str] | None]:
if "risk_log" not in turbine_df.columns:
return None, None
s = get_strings(lang or get_report_language())
rows: list[list[str]] = []
row_colors: list[str] = []
from src.utils import get_risk_definition_by_fixed_intervals, get_turbine_color_by_fixed_intervals
for _, turbine in turbine_df.iterrows():
risk_log = float(turbine.get("risk_log", 0) or 0)
color = get_turbine_color_by_fixed_intervals(risk_log)
rows.append(
[
str(turbine.get("name", "N/A")),
f"{risk_log:.2f}",
str(get_risk_definition_by_fixed_intervals(risk_log, lang or get_report_language())),
]
)
row_colors.append(str(color))
sorted_data = sorted(zip(rows, row_colors), key=lambda x: float(x[0][1]), reverse=True)
if sorted_data:
rows, row_colors = zip(*sorted_data)
else:
rows, row_colors = [], []
header = [s.col_turbine_name, s.col_log_risk, s.col_risk_definition]
return [header] + list(rows), ["lightgrey"] + list(row_colors)