diff --git a/report_service/adapter.py b/report_service/adapter.py index 4fc4ebc..a83b340 100644 --- a/report_service/adapter.py +++ b/report_service/adapter.py @@ -6,15 +6,13 @@ Keeping this isolated so the rest of the codebase never learns about n8n. """ from __future__ import annotations -from datetime import datetime, timezone from typing import Any import pandas as pd from src.config import config from src.reporting.strings import normalize_language - -_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items") +from src.visualization.storm_cells import normalize_storm_records _LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = { @@ -234,108 +232,3 @@ def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFram turbine_df = _build_turbine_df(payload.get("turbines") or []) lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name) return turbine_df, lightning_df - - -def _epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None: - if epoch_ms in (None, "", 0): - return None - try: - return ( - datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc) - .strftime("%Y-%m-%dT%H:%M:%SZ") - ) - except (TypeError, ValueError, OSError): - return None - - -def _exterior_to_wkt(exterior: Any) -> str | None: - """Convert iklim.co `polygon.exterior` array into a WKT LINESTRING.""" - if not isinstance(exterior, list) or not exterior: - return None - pairs: list[str] = [] - for point in exterior: - if not isinstance(point, dict): - return None - lng = point.get("lng") if "lng" in point else point.get("longitude") - lat = point.get("lat") if "lat" in point else point.get("latitude") - if lng is None or lat is None: - return None - try: - pairs.append(f"{float(lng)} {float(lat)}") - except (TypeError, ValueError): - return None - if len(pairs) < 2: - return None - return f"LINESTRING({', '.join(pairs)})" - - -def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]: - """ - Accept any of these shapes coming from n8n and return a flat list of - storm-cell dicts: - - * [ { "thunderstorms": [...] } ] (n8n HTTP item wrapper) - * { "thunderstorms": [...] } (single API response) - * [ {storm}, {storm}, ... ] (already flat) - * { "0": {storm}, "1": {storm} } (object-as-array) - """ - if raw is None: - return [] - - if isinstance(raw, list): - if raw and all(isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) for item in raw): - unwrapped: list[dict[str, Any]] = [] - for item in raw: - for key in _STORM_ENVELOPE_KEYS: - if key in item and isinstance(item[key], list): - unwrapped.extend(s for s in item[key] if isinstance(s, dict)) - break - return unwrapped - return [s for s in raw if isinstance(s, dict)] - - if isinstance(raw, dict): - for key in _STORM_ENVELOPE_KEYS: - if key in raw and isinstance(raw[key], list): - return [s for s in raw[key] if isinstance(s, dict)] - return [v for v in raw.values() if isinstance(v, dict)] - - return [] - - -def normalize_storm_records(raw: Any) -> list[dict[str, Any]]: - """ - Translate iklim.co `/v1/thunderstorms/within` records into the shape that - `src.reporting.docx` and `src.visualization.storm_cells` expect. - - Required output keys per record: - - cell_polygon_wkt - - effective_time / expire_time / creation_time (ISO 8601 UTC) - - lightning_severity - """ - out: list[dict[str, Any]] = [] - for record in _unwrap_storm_envelope(raw): - cell = record.get("cell") if isinstance(record.get("cell"), dict) else {} - polygon = cell.get("polygon") if isinstance(cell.get("polygon"), dict) else {} - wkt = _exterior_to_wkt(polygon.get("exterior")) if polygon else None - if not wkt: - continue - - effective = _epoch_ms_to_iso_utc(record.get("eventStartUtcEpoch")) - expire = _epoch_ms_to_iso_utc(record.get("eventEndUtcEpoch")) - created = _epoch_ms_to_iso_utc(record.get("insertedAtEpoch")) - - normalized = dict(record) - normalized["cell_polygon_wkt"] = wkt - if effective: - normalized["effective_time"] = effective - if expire: - normalized["expire_time"] = expire - if created: - normalized["creation_time"] = created - - severity = record.get("severity") or record.get("lightning_severity") - if severity is not None: - normalized["lightning_severity"] = str(severity).strip().lower() - - out.append(normalized) - return out diff --git a/report_service/main.py b/report_service/main.py index 97ec7dd..8169fe3 100644 --- a/report_service/main.py +++ b/report_service/main.py @@ -169,10 +169,18 @@ def _build_docx_file(payload: dict[str, Any]) -> tuple[str, str, int]: apply_farm_config(payload) turbine_df, lightning_df = build_dataframes(payload) - storm_records = normalize_storm_records(payload.get("storm_records")) or None - if storm_records is not None: + raw_storm_records = payload.get("storm_records") + storm_records = normalize_storm_records(raw_storm_records) + if raw_storm_records and not storm_records: + logger.warning( + "Received storm_records for %s but none could be normalized for rendering", + customer_name, + ) + elif storm_records: logger.info("Normalized %d storm record(s) for %s", len(storm_records), customer_name) + storm_records = storm_records or None + filename = _build_filename(payload) tmp_fd, tmp_path = tempfile.mkstemp(suffix=".docx") os.close(tmp_fd) diff --git a/src/reporting/docx.py b/src/reporting/docx.py index eb54b1d..62b992a 100644 --- a/src/reporting/docx.py +++ b/src/reporting/docx.py @@ -54,8 +54,9 @@ from src.visualization.storm_cells import ( filter_storm_data_by_date_range, filter_storm_data_by_turbine_proximity, load_storm_data_from_json, + normalize_storm_records, ) -from src.reporting.gemini_commentary import generate_gemini_paragraph +from src.reporting.gemini_commentary import generate_gemini_paragraph, generate_gemini_recommendation from src.reporting.strings import get_report_language, get_strings from src.utils import get_risk_definition_by_fixed_intervals @@ -224,7 +225,7 @@ def _build_key_findings( s, storm_cell_count: int = 0, storm_closest_to_centroid_km: float | None = None, -) -> tuple[list[str], str]: +) -> list[str]: radius_m = get_analysis_radius_m() if radius_m <= 0: radius_m = int(max(config.distance_rings)) if config.distance_rings else 0 @@ -302,14 +303,7 @@ def _build_key_findings( s.kf_storm_distance.format(count=storm_cell_count, distance=storm_closest_to_centroid_km) ) - if total_events == 0: - recommendation = s.recommendation_no_activity - elif high_plus_count > 0: - recommendation = s.recommendation_high_plus.format(count=high_plus_count, total=turbine_count) - else: - recommendation = s.recommendation_routine - - return bullets, recommendation + return bullets def _lighten_hex_color(hex_color: str, factor: float = 0.6) -> str: @@ -720,23 +714,9 @@ def create_docx_report( # Optional storm data load/filter (same behavior as PDF) storm_data: list[dict[str, Any]] = [] if storm_data_records is not None: - # `storm_records` may come back as: - # - list[dict] (expected), or - # - dict[id, list[dict]] (common for APIs) - # If we naively do `list(storm_data_records)` we would get dict keys (strings), - # which later breaks `.get(...)` access in storm filtering helpers. - if isinstance(storm_data_records, list): - storm_data = [s for s in storm_data_records if isinstance(s, dict)] - else: - normalized: list[dict[str, Any]] = [] - for value in storm_data_records.values(): - if isinstance(value, dict): - normalized.append(value) - elif isinstance(value, list): - normalized.extend([item for item in value if isinstance(item, dict)]) - storm_data = normalized + storm_data = normalize_storm_records(storm_data_records) elif storm_data_path: - storm_data = load_storm_data_from_json(storm_data_path) + storm_data = normalize_storm_records(load_storm_data_from_json(storm_data_path)) if storm_data: storm_data = filter_storm_data_by_date_range(storm_data, start_date, end_date) @@ -900,6 +880,9 @@ def create_docx_report( "storm_cell_count": storm_cell_count, "storm_closest_to_centroid_km": round(float(storm_closest_to_centroid_km), 1) if storm_closest_to_centroid_km is not None else None, "turbine_risk_counts": turbine_risk_counts, + "high_plus_count": int((pd.to_numeric(turbine_df["risk_log"], errors="coerce") >= 0.8).sum()) + if "risk_log" in turbine_df.columns and len(turbine_df) > 0 + else 0, "storm_summary": storm_summary, } @@ -910,7 +893,7 @@ def create_docx_report( if commentary_text.startswith(prefix): commentary_text = commentary_text[len(prefix):].strip() break - key_findings, recommendation_text = _build_key_findings( + key_findings = _build_key_findings( lightning_df, turbine_df, stats, @@ -920,6 +903,7 @@ def create_docx_report( storm_cell_count=storm_cell_count, storm_closest_to_centroid_km=storm_closest_to_centroid_km, ) + recommendation_text = generate_gemini_recommendation(commentary_context) # Keep the commentary close to the Turbine Information table (same page if possible). doc.add_paragraph("") diff --git a/src/reporting/gemini_commentary.py b/src/reporting/gemini_commentary.py index b7ed0c7..becb96d 100644 --- a/src/reporting/gemini_commentary.py +++ b/src/reporting/gemini_commentary.py @@ -350,6 +350,122 @@ def fallback_commentary(context: dict[str, Any], language: ReportLanguage | None return (paragraph_intro + turbine_sentence + method_sentence + storm_interaction_sentence + storm_severity_sentence).strip() +def build_gemini_recommendation_prompt(context: dict[str, Any], language: ReportLanguage | None = None) -> str: + lang = language or get_report_language() + s = get_strings(lang) + + total_events = context.get("total_events", 0) + turbine_count = context.get("turbine_count", 0) + high_plus_count = context.get("high_plus_count", 0) + top_turbine_name = context.get("top_turbine_name", "N/A") + max_risk_definition = context.get("max_risk_definition", "N/A") + turbine_risk_counts = context.get("turbine_risk_counts", {}) or {} + + if s.gemini_write_turkish: + return ( + "Yıldırım aktivite raporu için tek bir kısa öneri yaz.\n" + "En fazla 2 cümle. Akıcı ve doğal Türkçe kullan.\n" + "Sayı uydurma. Yalnızca verilen değerleri kullan.\n" + "\n" + "Üslup:\n" + "- Analitik, net ve alarmist olmayan.\n" + "- \"yıldırımdan korunma sistemi\", \"paratoner\" veya benzeri spesifik ekipman ifadeleri kullanma.\n" + "- Yüksek riskli türbinler varsa genel operasyonel bir öneri sun; örneğin \"teknik inceleme yapılması önerilir\".\n" + "- Yüksek risk yoksa rutin izlemenin yeterli olduğunu belirt.\n" + "- Olay yoksa bu dönemde aktivite kaydedilmediğini belirt.\n" + "\n" + "Bağlam:\n" + f"- total_events: {total_events}\n" + f"- turbine_count: {turbine_count}\n" + f"- high_plus_count: {high_plus_count}\n" + f"- top_turbine_name: {top_turbine_name}\n" + f"- max_risk_definition: {max_risk_definition}\n" + f"- turbine_risk_counts: {turbine_risk_counts}\n" + "\n" + "Çıktı:\n" + "Yalnızca öneri metni (başlık, madde işareti veya \"Öneri:\" etiketi yok)." + ) + + return ( + "Write a single short recommendation for a lightning activity report.\n" + "Maximum 2 sentences. Do not invent numbers; use only the values provided.\n" + "\n" + "Tone: analytic, clear, and non-alarmist.\n" + "Do not mention lightning protection systems, surge arresters, or specific equipment.\n" + "If high-risk turbines exist, recommend a general operational action such as technical inspection.\n" + "If no high-risk turbines exist, state that routine monitoring is sufficient.\n" + "If there were no events, state that no activity was recorded in the period.\n" + "\n" + "Context:\n" + f"- total_events: {total_events}\n" + f"- turbine_count: {turbine_count}\n" + f"- high_plus_count: {high_plus_count}\n" + f"- top_turbine_name: {top_turbine_name}\n" + f"- max_risk_definition: {max_risk_definition}\n" + f"- turbine_risk_counts: {turbine_risk_counts}\n" + "\n" + "Output:\n" + "Recommendation text only (no heading, bullet points, or \"Recommendation:\" label)." + ) + + +def fallback_recommendation(context: dict[str, Any], language: ReportLanguage | None = None) -> str: + lang = language or get_report_language() + s = get_strings(lang) + + total_events = int(context.get("total_events", 0) or 0) + high_plus_count = int(context.get("high_plus_count", 0) or 0) + turbine_count = int(context.get("turbine_count", 0) or 0) + + if total_events == 0: + return s.recommendation_no_activity + if high_plus_count > 0: + return s.recommendation_high_plus.format(count=high_plus_count, total=turbine_count) + return s.recommendation_routine + + +def generate_gemini_recommendation(context: dict[str, Any], api_key: str | None = None) -> str: + lang = get_report_language() + api_key_final = api_key or os.getenv("GEMINI_API_KEY") + if not api_key_final: + return fallback_recommendation(context, lang) + + model_name = os.getenv("GEMINI_MODEL", "gemini-1.5-flash") + prompt = build_gemini_recommendation_prompt(context, lang) + + try: + import google.generativeai as genai + + genai.configure(api_key=api_key_final) + model = genai.GenerativeModel(model_name) + + resp = model.generate_content( + prompt, + generation_config={ + "temperature": 0.2, + "max_output_tokens": 120, + }, + ) + + text = getattr(resp, "text", None) or "" + text = str(text).strip() + if not text: + return fallback_recommendation(context, lang) + + s = get_strings(lang) + for prefix in s.commentary_prefixes: + if text.startswith(prefix): + text = text[len(prefix):].strip() + break + if text.lower().startswith("öneri:"): + text = text[6:].strip() + if text.lower().startswith("recommendation:"): + text = text[15:].strip() + return text + except Exception: + return fallback_recommendation(context, lang) + + def generate_gemini_paragraph(context: dict[str, Any], api_key: str | None = None) -> str: lang = get_report_language() api_key_final = api_key or os.getenv("GEMINI_API_KEY") diff --git a/src/reporting/strings.py b/src/reporting/strings.py index 0a4859d..0578d54 100644 --- a/src/reporting/strings.py +++ b/src/reporting/strings.py @@ -268,7 +268,7 @@ _STRINGS_EN = ReportStrings( kf_active_days="Days with recorded activity: {days}", kf_storm_distance="Storm cells detected: {count} (closest {distance:.1f} km from the centroid)", recommendation_heading="Recommendation", - recommendation_high_plus="{count} of {total} turbines fall in the High risk category or above. Inspection of the lightning protection systems on these turbines is recommended.", + recommendation_high_plus="{count} of {total} turbines fall in the High risk category or above. Technical inspection of these turbines is recommended.", recommendation_routine="No turbines fall in the High risk category or above; routine monitoring is sufficient for this period.", recommendation_no_activity="No lightning activity was recorded within the analysis area during this period.", cloud_to_ground_lightnings="Cloud-to-Ground Lightnings", @@ -505,7 +505,7 @@ _STRINGS_TR = ReportStrings( kf_active_days="Aktivite görülen gün sayısı: {days}", kf_storm_distance="Tespit edilen fırtına hücresi: {count} (merkeze en yakın {distance:.1f} km)", recommendation_heading="Öneri", - recommendation_high_plus="{total} türbinden {count} adedi Yüksek risk ve üzeri sınıfta yer almaktadır. Bu türbinlerin yıldırımdan korunma sistemlerinin incelenmesi önerilir.", + recommendation_high_plus="{total} türbinden {count} adedi Yüksek risk ve üzeri sınıfta yer almaktadır. Bu türbinler için teknik inceleme yapılması önerilir.", recommendation_routine="Yüksek risk ve üzeri sınıfta türbin bulunmamaktadır; bu dönem için rutin izleme yeterlidir.", recommendation_no_activity="Bu dönemde analiz alanında yıldırım-şimşek aktivitesi kaydedilmemiştir.", cloud_to_ground_lightnings="Yıldırım", diff --git a/src/utils.py b/src/utils.py index c9801db..91193a5 100644 --- a/src/utils.py +++ b/src/utils.py @@ -139,6 +139,18 @@ def get_analysis_radius_m() -> int: return int(boundary) return outermost_ring + +def get_storm_monitoring_radius_km() -> float: + """Outer radius used when filtering thunderstorm cells for the report.""" + from .config import config + boundary = config.analysis_boundary_m + if isinstance(boundary, (int, float)) and boundary > 0: + return float(boundary) / 1000.0 + rings = config.distance_rings or [] + if rings: + return float(max(rings)) / 1000.0 + return 50.0 + def get_turbine_color_by_fixed_intervals(risk_log_value: float) -> str: """ Get turbine color based on fixed risk score intervals. diff --git a/src/visualization/maps.py b/src/visualization/maps.py index fd49439..795c659 100644 --- a/src/visualization/maps.py +++ b/src/visualization/maps.py @@ -13,7 +13,7 @@ COORDINATE_PLANE_LIGHTNING_SIZE_MAX = 24 COORDINATE_PLANE_LIGHTNING_CURRENT_SCALE = 800 COORDINATE_PLANE_TURBINE_NAME_FONT_SIZE = 9 COORDINATE_PLANE_TURBINE_TEXT_POSITION = 'middle center' -TURBINE_REFERENCE_NAME_FONT_SIZE = 11 +TURBINE_REFERENCE_NAME_FONT_SIZE = 8 TURBINE_REFERENCE_MARKER_SIZE = 22 TURBINE_REFERENCE_MIN_TURBINES = 4 TURBINE_REFERENCE_MIN_SEPARATION_M = 800.0 diff --git a/src/visualization/storm_cells.py b/src/visualization/storm_cells.py index af7f577..0e5c334 100644 --- a/src/visualization/storm_cells.py +++ b/src/visualization/storm_cells.py @@ -2,7 +2,7 @@ import plotly.graph_objects as go import plotly.express as px import numpy as np import pandas as pd -from datetime import datetime +from datetime import datetime, timezone import json from typing import List, Dict, Tuple, Any import re @@ -11,7 +11,7 @@ from zoneinfo import ZoneInfo from src.analysis.geospatial import haversine_distance from src.config import config from src.reporting.strings import ReportLanguage, get_report_language, get_strings -from src.utils import parse_period_string_to_datetime +from src.utils import get_storm_monitoring_radius_km, parse_period_string_to_datetime from src.visualization.basemap import add_satellite_basemap from src.visualization.maps import ( COORDINATE_PLANE_MARGIN_B, @@ -44,25 +44,34 @@ def format_datetime_for_display(datetime_str: str) -> str: def parse_wkt_linestring(wkt_string: str) -> List[Tuple[float, float]]: """ - Parse WKT LINESTRING format to extract coordinates. + Parse WKT LINESTRING or POLYGON format to extract coordinates. Args: - wkt_string: WKT string in format "LINESTRING(lon1 lat1, lon2 lat2, ...)" + wkt_string: WKT string in format "LINESTRING(lon1 lat1, ...)" or "POLYGON((...))" Returns: List of (longitude, latitude) tuples """ + if not wkt_string or not isinstance(wkt_string, str): + return [] + wkt = wkt_string.strip() + upper = wkt.upper() try: - # Extract coordinates from LINESTRING format - # Remove "LINESTRING(" and ")" and split by commas - coords_str = wkt_string.replace("LINESTRING(", "").replace(")", "") - coord_pairs = coords_str.split(",") - + if upper.startswith("POLYGON"): + inner = wkt[wkt.index("(") + 1 : wkt.rindex(")")] + if inner.startswith("("): + inner = inner[1 : inner.rindex(")")] + coords_str = inner + elif upper.startswith("LINESTRING"): + coords_str = wkt[wkt.index("(") + 1 : wkt.rindex(")")] + else: + return [] + coordinates = [] - for pair in coord_pairs: - lon, lat = pair.strip().split() - coordinates.append((float(lon), float(lat))) - + for pair in coords_str.split(","): + parts = pair.strip().split() + if len(parts) >= 2: + coordinates.append((float(parts[0]), float(parts[1]))) return coordinates except Exception as e: print(f"Error parsing WKT: {e}") @@ -543,18 +552,20 @@ def filter_storm_data_by_date_range(storm_data: List[Dict], start_date: str, end filtered_data = [] for storm in storm_data: time_field = storm.get('effective_time') or storm.get('creation_time') or storm.get('expire_time', '') - if time_field: - try: - storm_ts = pd.to_datetime(time_field, utc=True) - if tz is not None: - storm_ts = storm_ts.tz_convert(tz).tz_localize(None) - storm_dt = storm_ts.to_pydatetime() - else: - storm_dt = storm_ts.to_pydatetime().replace(tzinfo=None) - if start_dt <= storm_dt <= end_dt: - filtered_data.append(storm) - except Exception: - continue + if not time_field: + filtered_data.append(storm) + continue + try: + storm_ts = pd.to_datetime(time_field, utc=True) + if tz is not None: + storm_ts = storm_ts.tz_convert(tz).tz_localize(None) + storm_dt = storm_ts.to_pydatetime() + else: + storm_dt = storm_ts.to_pydatetime().replace(tzinfo=None) + if start_dt <= storm_dt <= end_dt: + filtered_data.append(storm) + except Exception: + filtered_data.append(storm) return filtered_data except Exception as e: print(f"Error filtering storm data: {e}") @@ -573,12 +584,13 @@ def filter_storm_data_by_turbine_proximity(storm_data: List[Dict], turbine_df: p Filtered list of storm cell dictionaries """ if max_distance_km is None: - # Use the farthest distance ring from config (convert meters to km) - max_distance_km = max(config.distance_rings) / 1000 + max_distance_km = get_storm_monitoring_radius_km() - print(f"🌩️ Filtering storm cells within {max_distance_km} km of turbine locations...") + print(f"🌩️ Filtering storm cells within {max_distance_km} km of farm/turbine locations...") filtered_storms = [] + centroid_lat = getattr(config, "centroid_lat", None) + centroid_lon = getattr(config, "centroid_lon", None) for storm in storm_data: wkt_string = storm.get('cell_polygon_wkt', '') @@ -589,7 +601,6 @@ def filter_storm_data_by_turbine_proximity(storm_data: List[Dict], turbine_df: p if not coords: continue - # Check if any point in the storm cell is within the distance threshold storm_within_range = False for storm_lon, storm_lat in coords: @@ -605,6 +616,19 @@ def filter_storm_data_by_turbine_proximity(storm_data: List[Dict], turbine_df: p if storm_within_range: break + + if not storm_within_range and centroid_lat is not None and centroid_lon is not None: + cell_centroid = calculate_storm_cell_centroid(wkt_string) + if cell_centroid is not None: + c_lat, c_lon = cell_centroid + if haversine_distance(centroid_lat, centroid_lon, c_lat, c_lon) / 1000 <= max_distance_km: + storm_within_range = True + + if not storm_within_range: + for storm_lon, storm_lat in coords: + if haversine_distance(centroid_lat, centroid_lon, storm_lat, storm_lon) / 1000 <= max_distance_km: + storm_within_range = True + break if storm_within_range: filtered_storms.append(storm) @@ -696,4 +720,219 @@ def create_storm_cells_summary(storm_data: List[Dict]) -> Dict[str, Any]: 'avg_speed': avg_speed, 'date_range': {'start': start_date, 'end': end_date}, 'daily_breakdown': daily_summary - } \ No newline at end of file + } + + +_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items") + + +def _storm_epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None: + if epoch_ms in (None, "", 0): + return None + try: + return ( + datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%SZ") + ) + except (TypeError, ValueError, OSError): + return None + + +def _storm_point_to_lng_lat(point: Any) -> tuple[float | None, float | None]: + if isinstance(point, dict): + lng = point.get("lng") if "lng" in point else point.get("longitude") or point.get("lon") + lat = point.get("lat") if "lat" in point else point.get("latitude") + elif isinstance(point, (list, tuple)) and len(point) >= 2: + lng, lat = point[0], point[1] + else: + return None, None + try: + return float(lng), float(lat) + except (TypeError, ValueError): + return None, None + + +def _storm_exterior_to_wkt(exterior: Any) -> str | None: + if not isinstance(exterior, list) or not exterior: + return None + pairs: list[str] = [] + for point in exterior: + lng, lat = _storm_point_to_lng_lat(point) + if lng is None or lat is None: + return None + pairs.append(f"{lng} {lat}") + if len(pairs) < 2: + return None + return f"LINESTRING({', '.join(pairs)})" + + +def _storm_normalize_wkt_string(wkt: Any) -> str | None: + if not wkt or not isinstance(wkt, str): + return None + wkt = wkt.strip() + if not wkt: + return None + upper = wkt.upper() + if upper.startswith("POLYGON"): + inner = wkt[wkt.index("(") + 1 : wkt.rindex(")")] + if inner.startswith("("): + inner = inner[1 : inner.rindex(")")] + return f"LINESTRING({inner})" if inner else None + if upper.startswith("LINESTRING"): + return wkt + return None + + +def _storm_exterior_from_polygon_obj(polygon: Any) -> Any: + if not isinstance(polygon, dict): + return None + for key in ("exterior", "exteriorRing", "exterior_ring", "coordinates", "coords"): + value = polygon.get(key) + if value: + return value + return None + + +def _storm_extract_wkt(record: dict[str, Any]) -> str | None: + for key in ("cell_polygon_wkt", "cellPolygonWkt", "polygon_wkt", "polygonWkt"): + existing = _storm_normalize_wkt_string(record.get(key)) + if existing: + return existing + + cell = record.get("cell") if isinstance(record.get("cell"), dict) else {} + polygon_sources = [ + cell.get("polygon"), + cell.get("threatPolygon"), + record.get("threatPolygon"), + record.get("polygon"), + ] + for polygon in polygon_sources: + if isinstance(polygon, list): + wkt = _storm_exterior_to_wkt(polygon) + if wkt: + return wkt + if isinstance(polygon, dict): + wkt = _storm_exterior_to_wkt(_storm_exterior_from_polygon_obj(polygon)) + if wkt: + return wkt + return None + + +def _storm_first_iso_timestamp(record: dict[str, Any], keys: tuple[str, ...]) -> str | None: + for key in keys: + value = record.get(key) + if value in (None, ""): + continue + if isinstance(value, (int, float)) or (isinstance(value, str) and value.isdigit()): + iso = _storm_epoch_ms_to_iso_utc(value) + if iso: + return iso + if isinstance(value, str): + parsed = pd.to_datetime(value, utc=True, errors="coerce") + if pd.notna(parsed): + return parsed.strftime("%Y-%m-%dT%H:%M:%SZ") + return None + + +def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]: + if raw is None: + return [] + + if isinstance(raw, list): + if raw and all( + isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) + for item in raw + ): + unwrapped: list[dict[str, Any]] = [] + for item in raw: + for key in _STORM_ENVELOPE_KEYS: + if key in item and isinstance(item[key], list): + unwrapped.extend(s for s in item[key] if isinstance(s, dict)) + break + return unwrapped + return [s for s in raw if isinstance(s, dict)] + + if isinstance(raw, dict): + for key in _STORM_ENVELOPE_KEYS: + if key in raw and isinstance(raw[key], list): + return [s for s in raw[key] if isinstance(s, dict)] + return [v for v in raw.values() if isinstance(v, dict)] + + return [] + + +def normalize_storm_records(raw: Any) -> list[dict[str, Any]]: + """ + Translate iklim.co `/v1/thunderstorms/within` records into the shape that + reporting and visualization code expect. + """ + records = _unwrap_storm_envelope(raw) + if not records: + return [] + + out: list[dict[str, Any]] = [] + skipped = 0 + for record in records: + wkt = _storm_extract_wkt(record) + if not wkt: + skipped += 1 + continue + + effective = _storm_first_iso_timestamp( + record, + ( + "effective_time", + "eventStartUtcEpoch", + "eventStartEpoch", + "startTimeEpoch", + "eventStartUtc", + "effectiveTime", + "startTime", + ), + ) + expire = _storm_first_iso_timestamp( + record, + ( + "expire_time", + "eventEndUtcEpoch", + "eventEndEpoch", + "endTimeEpoch", + "eventEndUtc", + "expireTime", + "endTime", + ), + ) + created = _storm_first_iso_timestamp( + record, + ( + "creation_time", + "insertedAtEpoch", + "insertedEpoch", + "createdAtEpoch", + "insertedAtUtc", + "creationTime", + "createdAt", + ), + ) + + normalized = dict(record) + normalized["cell_polygon_wkt"] = wkt + if effective: + normalized["effective_time"] = effective + if expire: + normalized["expire_time"] = expire + if created: + normalized["creation_time"] = created + + severity = record.get("severity") or record.get("lightning_severity") + if severity is not None: + normalized["lightning_severity"] = str(severity).strip().lower() + + out.append(normalized) + + if records and not out: + print(f"Warning: received {len(records)} storm record(s) but none had a usable polygon") + elif skipped: + print(f"Warning: skipped {skipped}/{len(records)} storm record(s) without polygon data") + + return out \ No newline at end of file