BE-LightningReport/src/reporting/docx_sections.py

277 lines
7.2 KiB
Python

from __future__ import annotations
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from src.config import config
from src.reporting.precompute import precompute_distances_and_rings
def _parse_chart_times(values: pd.Series) -> pd.Series:
def _to_local_naive(ts: pd.Series) -> pd.Series:
if config.timezone:
try:
ts = ts.dt.tz_convert(config.timezone)
except Exception:
pass
try:
return ts.dt.tz_localize(None)
except Exception:
return ts
numeric = pd.to_numeric(values, errors="coerce")
numeric_valid = numeric.dropna()
if len(numeric_valid) > 0 and len(numeric_valid) >= max(1, int(len(values) * 0.7)):
abs_max = float(numeric_valid.abs().max())
if abs_max >= 1e17:
unit = "ns"
elif abs_max >= 1e14:
unit = "us"
elif abs_max >= 1e11:
unit = "ms"
else:
unit = "s"
parsed = pd.to_datetime(numeric, errors="coerce", unit=unit, utc=True)
return _to_local_naive(parsed)
parsed = pd.to_datetime(values, errors="coerce", utc=True)
return _to_local_naive(parsed)
def _build_current_vs_distance_chart(
lightning_df: pd.DataFrame,
dists_km: np.ndarray,
mask_within: np.ndarray,
lightning_type_filter: str,
title: str,
fig_width: int,
fig_height: int,
) -> go.Figure | None:
if lightning_type_filter == "cg":
type_mask = lightning_df["p_type"].astype(str) == "0"
else:
type_mask = lightning_df["p_type"].astype(str) != "0"
combined_mask = mask_within & type_mask
if combined_mask.sum() == 0:
return None
subset = lightning_df.loc[combined_mask].copy()
distances = dists_km[combined_mask]
currents = subset["current"].values.astype(float)
time_series = _parse_chart_times(subset["local_time"])
valid_mask = ~time_series.isna()
if valid_mask.sum() == 0:
return None
time_series = time_series.loc[valid_mask]
distances = distances[valid_mask.values]
currents = currents[valid_mask.values]
sort_idx = np.argsort(time_series.values.astype("datetime64[ns]"))
time_values = time_series.values[sort_idx]
distances = distances[sort_idx]
currents = currents[sort_idx]
rings_km = np.array(config.distance_rings, dtype=float) / 1000.0
ring_colors_cfg = getattr(config, "ring_colors", None) or []
ring_indices = np.searchsorted(rings_km, distances, side="left").astype(int)
ring_indices = np.clip(ring_indices, 0, len(rings_km) - 1)
ring_names: list[str] = []
for i in range(len(rings_km)):
if i == 0:
ring_names.append(f"0-{rings_km[0]:.1f} km")
else:
ring_names.append(f"{rings_km[i - 1]:.1f}-{rings_km[i]:.1f} km")
fig = go.Figure()
for i in range(len(rings_km)):
mask_ring = ring_indices == i
if mask_ring.sum() == 0:
continue
color = ring_colors_cfg[i] if i < len(ring_colors_cfg) else "gray"
r_times = time_values[mask_ring]
r_currents = currents[mask_ring]
r_dists = distances[mask_ring]
tz_suffix = f" ({config.timezone})" if config.timezone else ""
r_time_labels = pd.to_datetime(r_times).strftime("%d-%m-%Y %H:%M").values
r_time_labels = np.array([f"{t}{tz_suffix}" for t in r_time_labels])
fig.add_trace(
go.Scatter(
x=r_times,
y=r_currents,
mode="markers",
name=ring_names[i],
marker=dict(size=10, opacity=0.8, color=color),
customdata=np.column_stack((r_dists, r_time_labels)),
hovertemplate=(
"Time: %{customdata[1]}<br>"
"Current: %{y:.0f} A<br>"
"Distance: %{customdata[0]} km<br>"
"Ring: " + ring_names[i] + "<br>"
"<extra></extra>"
),
)
)
timezone_label = config.timezone or "UTC"
fig.update_layout(
font=dict(size=16),
title=dict(text=title, x=0.5, font=dict(size=22)),
xaxis_title=f"Time ({timezone_label})",
yaxis_title="Current (A)",
plot_bgcolor="white",
paper_bgcolor="white",
xaxis=dict(
type="date",
showgrid=True,
gridcolor="lightgray",
zeroline=False,
tickformat="%d-%m-%Y %H:%M",
nticks=6,
tickangle=-25,
tickfont=dict(size=22),
title_font=dict(size=28),
),
yaxis=dict(
showgrid=True,
gridcolor="lightgray",
zeroline=False,
tickfont=dict(size=22),
title_font=dict(size=28),
),
legend=dict(
title="Distance Ring",
orientation="h",
x=0.5,
xanchor="center",
y=-0.28,
yanchor="top",
bgcolor="rgba(255,255,255,0.8)",
bordercolor="black",
borderwidth=1,
font=dict(size=20),
title_font=dict(size=24),
),
width=fig_width,
height=fig_height,
margin=dict(l=70, r=40, t=50, b=130),
)
return fig
def build_lightning_event_table_data(
centroid_lat: float, centroid_lng: float, lightning_df: pd.DataFrame
) -> tuple[list[list[str]], list[str]]:
pre = precompute_distances_and_rings(
centroid_lat, centroid_lng, lightning_df, config.distance_rings
)
rows: list[list[str]] = []
row_colors: list[str] = []
outermost_km = max(config.distance_rings) / 1000.0
rings_km = [r / 1000.0 for r in config.distance_rings]
for i, rec in enumerate(lightning_df.itertuples(index=False)):
proximity = float(pre["dists_km"][i])
if proximity > outermost_km:
continue
ri = int(pre["ring_idx"][i])
if ri >= len(rings_km):
continue
color = config.ring_colors[ri]
try:
from src.utils import format_datetime_ddmmyyyy_hhmmss
dt_val = (
rec.local_time
if not isinstance(rec.local_time, str)
else pd.to_datetime(str(rec.local_time)[:19])
)
local_time = format_datetime_ddmmyyyy_hhmmss(pd.to_datetime(dt_val))
except Exception:
local_time = str(getattr(rec, "local_time", ""))[:19]
lightning_type = "cloud-to-ground" if str(rec.p_type) == "0" else "intercloud"
height_val = getattr(rec, "ic_height", "")
if height_val == "":
height_val = getattr(rec, "height", "")
rows.append(
[
"",
local_time,
f"{rec.lat:.5f}",
f"{rec.lng:.5f}",
str(rec.current),
str(height_val),
lightning_type,
f"{proximity:.2f}",
]
)
row_colors.append(color)
sorted_data = sorted(
zip(rows, row_colors),
key=lambda x: (0 if x[0][6] == "cloud-to-ground" else 1, float(x[0][7])),
)
if sorted_data:
rows, row_colors = zip(*sorted_data)
rows = list(rows)
row_colors = list(row_colors)
for idx, row in enumerate(rows):
row[0] = str(idx + 1)
else:
rows, row_colors = [], []
header = [
"#",
"Time (Local)",
"Lat",
"Lng",
"Current (amps)",
"Height (m)",
"Lightning Type",
"Proximity (km)",
]
return [header] + rows, ["lightgrey"] + row_colors
def build_risk_table_data(
turbine_df: pd.DataFrame,
) -> tuple[list[list[str]] | None, list[str] | None]:
if "risk_log" not in turbine_df.columns:
return None, None
rows: list[list[str]] = []
row_colors: list[str] = []
from src.utils import get_risk_definition_by_fixed_intervals, get_turbine_color_by_fixed_intervals
for _, turbine in turbine_df.iterrows():
risk_log = float(turbine.get("risk_log", 0) or 0)
color = get_turbine_color_by_fixed_intervals(risk_log)
rows.append(
[
str(turbine.get("name", "N/A")),
f"{risk_log:.2f}",
str(get_risk_definition_by_fixed_intervals(risk_log)),
]
)
row_colors.append(str(color))
sorted_data = sorted(zip(rows, row_colors), key=lambda x: float(x[0][1]), reverse=True)
if sorted_data:
rows, row_colors = zip(*sorted_data)
else:
rows, row_colors = [], []
header = ["Turbine Name", "Log Risk", "Risk Definition"]
return [header] + list(rows), ["lightgrey"] + list(row_colors)