BE-LightningReport/src/reporting/gemini_commentary.py
erdemerikci 45d80dfaa6 Initial import: Lightning_Report with n8n integration
Fork of Lightning_Report adding:
- n8n_report_branch.json: workflow branch for storm-triggered report delivery
- report_service/: FastAPI microservice wrapping create_docx_report() so n8n
  can produce byte-identical reports without fighting the Python Code sandbox

Made-with: Cursor
2026-04-22 15:13:08 +03:00

210 lines
9.9 KiB
Python

from __future__ import annotations
import os
from typing import Any
from src.utils import get_risk_definition_by_fixed_intervals
def build_gemini_prompt(context: dict[str, Any]) -> str:
analysis_period = context.get("analysis_period", "N/A")
analysis_radius_km = context.get("analysis_radius_km", None)
total_events = context.get("total_events", None)
total_lightning_per_km2 = context.get("total_lightning_per_km2", None)
turbine_count = context.get("turbine_count", None)
is_single_turbine_report = context.get("is_single_turbine_report", None)
top_rings = context.get("top_rings", []) # list of (ring_name, total, cg_count, ic_count)
max_risk_log = context.get("max_risk_log", None)
max_risk_definition = context.get("max_risk_definition", None)
top_turbine_name = context.get("top_turbine_name", "N/A")
top_turbine_risk_log = context.get("top_turbine_risk_log", None)
storm_summary = context.get("storm_summary")
storm_over_turbine = context.get("storm_over_turbine")
storm_near_turbine_count = context.get("storm_near_turbine_count")
storm_closest_distance_km = context.get("storm_closest_distance_km")
storm_over_threshold_km = context.get("storm_over_threshold_km", 1.0)
ring_lines: list[str] = []
for ring in top_rings[:3]:
try:
ring_name, total, cg_count, ic_count = ring
except Exception:
continue
ring_lines.append(f"- {ring_name}: total={total}, cloud-to-ground={cg_count}, intercloud={ic_count}")
storm_lines: list[str] = []
if isinstance(storm_summary, dict) and storm_summary:
total_cells = storm_summary.get("total_cells", 0)
severity_counts = storm_summary.get("severity_counts", {}) or {}
storm_lines.append(f"- total_cells={total_cells}")
for severity, count in severity_counts.items():
storm_lines.append(f"- {severity}_cells={count}")
return (
"You are generating a single neutral, factual commentary paragraph for a lightning activity report.\n"
"Write exactly 3-4 sentences.\n"
"Do not invent any numbers. Only use the values provided.\n"
"\n"
"Risk explanation requirements (must be reflected in the paragraph):\n"
"- Turbine risk increases for cloud-to-ground strikes with larger current magnitude.\n"
"- Turbine risk decays exponentially with increasing distance from the turbine.\n"
"- The turbine risk score is the sum of per-strike contributions (then log-transformed for visualization/heatmaps/tables).\n"
"\n"
"Context:\n"
f"- analysis_period: {analysis_period}\n"
f"- analysis_radius_km: {analysis_radius_km}\n"
f"- total_events: {total_events}\n"
f"- total_lightning_per_km2: {total_lightning_per_km2}\n"
f"- turbine_count: {turbine_count}\n"
f"- is_single_turbine_report: {is_single_turbine_report}\n"
f"- top_rings:\n{chr(10).join(ring_lines) if ring_lines else '- N/A'}\n"
f"- max_risk_log: {max_risk_log}\n"
f"- max_risk_definition: {max_risk_definition}\n"
f"- top_turbine_name: {top_turbine_name}\n"
f"- top_turbine_risk_log: {top_turbine_risk_log}\n"
f"- storm_over_turbine: {storm_over_turbine}\n"
f"- storm_near_turbine_count: {storm_near_turbine_count}\n"
f"- storm_closest_distance_km: {storm_closest_distance_km}\n"
f"- storm_over_threshold_km: {storm_over_threshold_km}\n"
+ (f"\n- storm_summary:\n{chr(10).join(storm_lines)}" if storm_lines else "\n- storm_summary: not available")
+ "\n\n"
"Requirements for the paragraph:\n"
"- Mention one key takeaway from the ring distribution (e.g., where totals are highest).\n"
"- Mention the overall lightning density (events/km²).\n"
"- If is_single_turbine_report is true: start the sentence mentioning the highest-risk turbine as \"For {top_turbine_name}, ...\"; avoid wording like \"Within the analyzed area\" and avoid verbs like \"was identified\".\n"
"- If is_single_turbine_report is false: you may use wording like \"Within the analyzed area, {top_turbine_name} was identified ...\".\n"
"- Mention the specific turbine name with the highest risk score (top_turbine_name) verbatim.\n"
"- Mention the risk category for that turbine using max_risk_definition.\n"
"- Do not refer only to the category; always associate the risk with top_turbine_name.\n"
"- Mention storm-cell interaction with the turbine when storm information is available:\n"
" - If storm_over_turbine is true: say that storm cells were very close to/over the turbine (based on centroid distance <= storm_over_threshold_km).\n"
" - If storm_over_turbine is false and storm_closest_distance_km is provided: say the closest storm cell centroid came within storm_closest_distance_km km of the turbine.\n"
"- If storm_summary is available, mention total storm cells and at least one severity count.\n"
"- Round numeric values as follows (use the rounded values you are given, avoid long decimals):\n"
" - lightning density to 3 decimals (events/km²)\n"
" - log-transformed risk score(s) to 2 decimals\n"
" - distances (analysis_radius_km, storm_closest_distance_km) to 1 decimal (km)\n"
" - counts to integers\n"
"- Keep tone analytic and non-alarmist.\n"
"\n"
"Output:\n"
"One paragraph only (no bullet points, no headings)."
)
def fallback_commentary(context: dict[str, Any]) -> str:
analysis_period = context.get("analysis_period", "N/A")
analysis_radius_km = context.get("analysis_radius_km", None)
total_events = context.get("total_events", None)
total_lightning_per_km2 = context.get("total_lightning_per_km2", None)
turbine_count = context.get("turbine_count", None)
is_single_turbine_report = context.get("is_single_turbine_report", None)
top_rings = context.get("top_rings", [])
max_risk_definition = context.get("max_risk_definition", "N/A")
top_turbine_name = context.get("top_turbine_name", "N/A")
top_turbine_risk_log = context.get("top_turbine_risk_log", None)
storm_over_turbine = context.get("storm_over_turbine")
storm_closest_distance_km = context.get("storm_closest_distance_km")
storm_over_threshold_km = context.get("storm_over_threshold_km", 1.0)
storm_near_turbine_count = context.get("storm_near_turbine_count")
outermost_ring = top_rings[-1] if top_rings else None
best_ring = top_rings[0] if top_rings else None
def _format_ring(ring: Any) -> str:
try:
ring_name, total, cg_count, ic_count = ring
return f"{ring_name} (total={total}, cloud-to-ground={cg_count}, intercloud={ic_count})"
except Exception:
return "N/A"
best_ring_txt = _format_ring(best_ring)
outer_ring_txt = _format_ring(outermost_ring)
storm_summary = context.get("storm_summary") or {}
storm_line = ""
if isinstance(storm_summary, dict) and storm_summary:
total_cells = storm_summary.get("total_cells", 0)
severity_counts = storm_summary.get("severity_counts", {}) or {}
if severity_counts:
# Pick max severity to mention
severity = max(severity_counts.items(), key=lambda kv: kv[1])[0]
count = severity_counts.get(severity, 0)
storm_line = f"Storm data indicates {total_cells} storm cells, with the highest share in {severity} ({count} cells)."
else:
storm_line = f"Storm data indicates {total_cells} storm cells."
density_txt = (
f"{total_lightning_per_km2:.3f} events/km²" if isinstance(total_lightning_per_km2, (int, float)) else str(total_lightning_per_km2)
)
radius_txt = f"within {analysis_radius_km:.1f} km" if isinstance(analysis_radius_km, (int, float)) else ""
events_txt = f"{total_events} total lightning events" if total_events is not None else "N/A"
paragraph_intro = (
f"For {analysis_period}, the dataset contains {events_txt} {radius_txt}, corresponding to an overall lightning density of {density_txt}. "
f"The largest contributions are concentrated in {best_ring_txt}, with additional activity also present in {outer_ring_txt}. "
)
turbine_sentence = (
f"For {top_turbine_name}, the log-transformed risk score is the highest in this report and falls in the {max_risk_definition} category. "
if is_single_turbine_report
else f"Within the analyzed area, the turbine with the highest log-transformed risk score is {top_turbine_name}, which falls in the {max_risk_definition} category. "
)
method_sentence = (
f"This indicates the turbine was exposed to a combination of closer cloud-to-ground strikes and stronger current magnitudes. "
f"In the risk model, each cloud-to-ground strike contributes more when it is near the turbine and when |I| is larger, and contributions decrease exponentially with distance; the turbine risk score is the sum over all included strikes (with a log transform used for visualization). "
)
if storm_over_turbine:
storm_interaction_sentence = (
f"Storm interaction: storm-cell centroids came within {storm_over_threshold_km:.1f} km of the turbine (count={storm_near_turbine_count}). "
)
elif isinstance(storm_closest_distance_km, (int, float)):
storm_interaction_sentence = (
f"Storm interaction: the closest storm-cell centroid came within {storm_closest_distance_km:.1f} km of the turbine. "
)
else:
storm_interaction_sentence = ""
storm_severity_sentence = storm_line if storm_line else "Storm severity distribution is not available for this report."
paragraph = (paragraph_intro + turbine_sentence + method_sentence + storm_interaction_sentence + storm_severity_sentence).strip()
return paragraph
def generate_gemini_paragraph(context: dict[str, Any], api_key: str | None = None) -> str:
api_key_final = api_key or os.getenv("GEMINI_API_KEY")
if not api_key_final:
return fallback_commentary(context)
model_name = os.getenv("GEMINI_MODEL", "gemini-1.5-flash")
prompt = build_gemini_prompt(context)
try:
import google.generativeai as genai
genai.configure(api_key=api_key_final)
model = genai.GenerativeModel(model_name)
# Keep output short and deterministic
resp = model.generate_content(
prompt,
generation_config={
"temperature": 0.2,
"max_output_tokens": 220,
},
)
text = getattr(resp, "text", None) or ""
text = str(text).strip()
if not text:
return fallback_commentary(context)
return text
except Exception:
return fallback_commentary(context)