Remove n8n_report_branch.json and integrate storm record normalization in report_service. Adapter now includes functions for converting epoch timestamps to ISO format and unwrapping storm data from various API response shapes. Updated main.py to utilize the new normalization function for storm records.

This commit is contained in:
erdemerikci 2026-04-29 16:09:11 +03:00
parent 5d8c08dc86
commit 543c316842
3 changed files with 112 additions and 199 deletions

View File

@ -1,197 +0,0 @@
{
"name": "Lightning Report Branch (paste into Lightning_Report_Automatic)",
"nodes": [
{
"parameters": {
"assignments": {
"assignments": [
{ "id": "rpt-cid", "name": "customer_id", "value": "={{ $('Loop Over Items').item.json.id }}", "type": "string" },
{ "id": "rpt-cname", "name": "customer_name", "value": "={{ $('Loop Over Items').item.json.customer_name }}", "type": "string" },
{ "id": "rpt-tz", "name": "timezone", "value": "={{ $('Loop Over Items').item.json.timezone || 'Europe/Istanbul' }}", "type": "string" },
{ "id": "rpt-clat", "name": "centroid_lat", "value": "={{ $('Centroid & Distance Ring calculation').item.json.centroid_latitude }}", "type": "number" },
{ "id": "rpt-clon", "name": "centroid_lon", "value": "={{ $('Centroid & Distance Ring calculation').item.json.centroid_longitude }}", "type": "number" },
{ "id": "rpt-bnd", "name": "boundary_m", "value": "={{ $('Centroid & Distance Ring calculation').item.json.monitoring_boundary_m }}","type": "number" },
{ "id": "rpt-rings", "name": "rings", "value": "={{ $('Centroid & Distance Ring calculation').item.json.rings }}", "type": "object" },
{ "id": "rpt-rcolors", "name": "ring_colors", "value": "={{ [\"#B71C1C\", \"#F94144\", \"#F8961E\", \"#90BE6D\"] }}", "type": "array" },
{ "id": "rpt-ts", "name": "t_start", "value": "={{ $('Logic Gate').item.json.tStart }}", "type": "number" },
{ "id": "rpt-te", "name": "t_end", "value": "={{ $('Logic Gate').item.json.tLast }}", "type": "number" },
{ "id": "rpt-ns", "name": "n_strikes", "value": "={{ $('Logic Gate').item.json.allStrikes.length }}", "type": "number" },
{ "id": "rpt-strikes", "name": "strikes", "value": "={{ $('Logic Gate').item.json.allStrikes }}", "type": "array" },
{ "id": "rpt-turbines", "name": "turbines", "value": "={{ $('Get Customer Wind Turbines').all().map(t => t.json) }}", "type": "array" }
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [3440, 480],
"id": "a1a1a1a1-0001-4a01-8a01-000000000001",
"name": "Report: Gather Inputs"
},
{
"parameters": {
"jsCode": "const crypto = require('crypto');\nconst HMAC_SECRET = 'c88f845bd6d520ded507ef6b02efc223019ccf68f41d9070705712d480ba5166';\nconst URI = '/v1/thunderstorms/within';\n\nconst ctx = $('Report: Gather Inputs').item.json;\nconst auth = $('Restore Credentials').all().pop().json;\n\nif (!ctx.t_start || !ctx.t_end) {\n throw new Error('Missing storm timestamps from Logic Gate.');\n}\n\nconst durationSeconds = Math.max(600, Math.floor((ctx.t_end - ctx.t_start) / 1000));\nconst timestamp = Date.now().toString();\n\nconst bodyPayload = {\n latitude: Number(Number(ctx.centroid_lat).toFixed(6)),\n longitude: Number(Number(ctx.centroid_lon).toFixed(6)),\n radius: parseInt(ctx.boundary_m, 10),\n backwardInterval: durationSeconds,\n endTimeEpoch: Number(ctx.t_end),\n intersectsWith: 'THREAT_POLYGON',\n pageNumber: 0,\n pageSize: 100\n};\n\nconst bodyString = JSON.stringify(bodyPayload);\nconst dataToSign = `POST|${URI}|${timestamp}|${bodyString}`;\nconst signature = crypto.createHmac('sha256', HMAC_SECRET).update(dataToSign).digest('hex').toLowerCase();\n\nreturn [{\n json: {\n requestBody: bodyPayload,\n headers: {\n 'X-Signature': signature,\n 'X-Timestamp': timestamp,\n 'X-Nonce': crypto.randomUUID(),\n 'X-Idempotency-Key': crypto.randomUUID(),\n 'Authorization': 'Bearer ' + auth.accessToken,\n 'Content-Type': 'application/json'\n }\n }\n}];"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [3680, 480],
"id": "a2a2a2a2-0002-4a02-8a02-000000000002",
"name": "Report: Calc Thunderstorm Headers"
},
{
"parameters": {
"method": "POST",
"url": "https://api-test.iklim.co/v1/thunderstorms/within",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{ "name": "Authorization", "value": "={{ $json.headers.Authorization }}" },
{ "name": "X-Signature", "value": "={{ $json.headers['X-Signature'] }}" },
{ "name": "X-Timestamp", "value": "={{ $json.headers['X-Timestamp'] }}" },
{ "name": "X-Nonce", "value": "={{ $json.headers['X-Nonce'] }}" },
{ "name": "X-Idempotency-Key", "value": "={{ $json.headers['X-Idempotency-Key'] }}" },
{ "name": "Content-Type", "value": "={{ $json.headers['Content-Type'] }}" },
{ "name": "Accept", "value": "={{ $json.headers['Content-Type'] }}" }
]
},
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ $json.requestBody }}",
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.3,
"position": [3920, 480],
"id": "a3a3a3a3-0003-4a03-8a03-000000000003",
"name": "Report: Fetch Thunderstorms",
"onError": "continueRegularOutput"
},
{
"parameters": {
"method": "POST",
"url": "=https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={{ $env.GEMINI_API_KEY }}",
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ { contents: [ { parts: [ { text: 'Sen bir yildirim risk analisti raportorusun. Asagidaki verileri kullanarak Turkce, 2-3 paragraf, profesyonel bir ozet yaz. Sayilari dogal dilde ver; asiri teknik olma; santral adini kullan; sure, toplam darbe ve yogunluk hakkinda yorum yap.\\n\\nSantral: ' + $('Report: Gather Inputs').item.json.customer_name + '\\nZaman dilimi: ' + $('Report: Gather Inputs').item.json.timezone + '\\nFirtina baslangici (epoch ms): ' + $('Report: Gather Inputs').item.json.t_start + '\\nFirtina bitisi (epoch ms): ' + $('Report: Gather Inputs').item.json.t_end + '\\nToplam yildirim darbesi: ' + $('Report: Gather Inputs').item.json.n_strikes + '\\nIzleme yaricapi (m): ' + $('Report: Gather Inputs').item.json.boundary_m } ] } ] } }}",
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.3,
"position": [4160, 480],
"id": "a4a4a4a4-0004-4a04-8a04-000000000004",
"name": "Report: Gemini Commentary",
"onError": "continueRegularOutput"
},
{
"parameters": {
"assignments": {
"assignments": [
{ "id": "bp-cid", "name": "customer_id", "value": "={{ $('Report: Gather Inputs').item.json.customer_id }}", "type": "string" },
{ "id": "bp-cname", "name": "customer_name", "value": "={{ $('Report: Gather Inputs').item.json.customer_name }}", "type": "string" },
{ "id": "bp-tz", "name": "timezone", "value": "={{ $('Report: Gather Inputs').item.json.timezone }}", "type": "string" },
{ "id": "bp-clat", "name": "centroid_lat", "value": "={{ $('Report: Gather Inputs').item.json.centroid_lat }}", "type": "number" },
{ "id": "bp-clon", "name": "centroid_lon", "value": "={{ $('Report: Gather Inputs').item.json.centroid_lon }}", "type": "number" },
{ "id": "bp-bnd", "name": "boundary_m", "value": "={{ $('Report: Gather Inputs').item.json.boundary_m }}", "type": "number" },
{ "id": "bp-rings", "name": "rings", "value": "={{ $('Report: Gather Inputs').item.json.rings }}", "type": "object" },
{ "id": "bp-rcolors", "name": "ring_colors", "value": "={{ $('Report: Gather Inputs').item.json.ring_colors }}", "type": "array" },
{ "id": "bp-ts", "name": "t_start", "value": "={{ $('Report: Gather Inputs').item.json.t_start }}", "type": "number" },
{ "id": "bp-te", "name": "t_end", "value": "={{ $('Report: Gather Inputs').item.json.t_end }}", "type": "number" },
{ "id": "bp-ns", "name": "n_strikes", "value": "={{ $('Report: Gather Inputs').item.json.n_strikes }}", "type": "number" },
{ "id": "bp-strikes", "name": "strikes", "value": "={{ $('Report: Gather Inputs').item.json.strikes }}", "type": "array" },
{ "id": "bp-turbines", "name": "turbines", "value": "={{ $('Report: Gather Inputs').item.json.turbines }}", "type": "array" },
{ "id": "bp-gem", "name": "gemini_text", "value": "={{ $('Report: Gemini Commentary').item.json?.candidates?.[0]?.content?.parts?.[0]?.text || '' }}", "type": "string" },
{ "id": "bp-storms", "name": "storm_records", "value": "={{ $('Report: Fetch Thunderstorms').item.json?.thunderstorms || $('Report: Fetch Thunderstorms').item.json?.data || [] }}", "type": "array" }
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [4320, 480],
"id": "a8a8a8a8-0008-4a08-8a08-000000000008",
"name": "Report: Build Payload"
},
{
"parameters": {
"method": "POST",
"url": "={{ $env.REPORT_SERVICE_URL || 'http://report-service:8000' }}/generate",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{ "name": "Content-Type", "value": "application/json" },
{ "name": "Accept", "value": "application/vnd.openxmlformats-officedocument.wordprocessingml.document" }
]
},
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ $json }}",
"options": {
"timeout": 300000,
"response": {
"response": {
"responseFormat": "file",
"outputPropertyName": "report"
}
}
}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.3,
"position": [4560, 480],
"id": "a5a5a5a5-0005-4a05-8a05-000000000005",
"name": "Report: Generate DOCX"
},
{
"parameters": {
"resource": "file",
"operation": "upload",
"binaryData": true,
"binaryPropertyName": "report",
"channelId": {
"__rl": true,
"value": "REPLACE_WITH_USER_ID",
"mode": "id",
"cachedResultName": "DM target user"
},
"options": {
"fileName": "={{ $binary.report.fileName || ($('Report: Build Payload').item.json.customer_name + '_report.docx') }}",
"initialComment": "={{ '⚡ ' + $('Report: Build Payload').item.json.customer_name + ' — yeni firtina raporu (' + $('Report: Build Payload').item.json.n_strikes + ' darbe) — rapor ekte' }}"
}
},
"type": "n8n-nodes-base.slack",
"typeVersion": 2.4,
"position": [4800, 480],
"id": "a6a6a6a6-0006-4a06-8a06-000000000006",
"name": "Report: Send to User",
"credentials": {
"slackApi": {
"id": "OKgM8VkM05pJl9kU",
"name": "Tarla Slack Account"
}
}
}
],
"pinData": {},
"connections": {
"Report: Gather Inputs": {
"main": [[{ "node": "Report: Calc Thunderstorm Headers", "type": "main", "index": 0 }]]
},
"Report: Calc Thunderstorm Headers": {
"main": [[{ "node": "Report: Fetch Thunderstorms", "type": "main", "index": 0 }]]
},
"Report: Fetch Thunderstorms": {
"main": [[{ "node": "Report: Gemini Commentary", "type": "main", "index": 0 }]]
},
"Report: Gemini Commentary": {
"main": [[{ "node": "Report: Build Payload", "type": "main", "index": 0 }]]
},
"Report: Build Payload": {
"main": [[{ "node": "Report: Generate DOCX", "type": "main", "index": 0 }]]
},
"Report: Generate DOCX": {
"main": [[{ "node": "Report: Send to User", "type": "main", "index": 0 }]]
}
},
"active": false,
"settings": { "executionOrder": "v1" }
}

View File

@ -6,12 +6,15 @@ Keeping this isolated so the rest of the codebase never learns about n8n.
""" """
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timezone
from typing import Any from typing import Any
import pandas as pd import pandas as pd
from src.config import config from src.config import config
_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items")
_LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = { _LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = {
"lat": ["lat", "latitude"], "lat": ["lat", "latitude"],
@ -198,3 +201,108 @@ def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFram
turbine_df = _build_turbine_df(payload.get("turbines") or []) turbine_df = _build_turbine_df(payload.get("turbines") or [])
lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name) lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name)
return turbine_df, lightning_df return turbine_df, lightning_df
def _epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None:
if epoch_ms in (None, "", 0):
return None
try:
return (
datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%SZ")
)
except (TypeError, ValueError, OSError):
return None
def _exterior_to_wkt(exterior: Any) -> str | None:
"""Convert iklim.co `polygon.exterior` array into a WKT LINESTRING."""
if not isinstance(exterior, list) or not exterior:
return None
pairs: list[str] = []
for point in exterior:
if not isinstance(point, dict):
return None
lng = point.get("lng") if "lng" in point else point.get("longitude")
lat = point.get("lat") if "lat" in point else point.get("latitude")
if lng is None or lat is None:
return None
try:
pairs.append(f"{float(lng)} {float(lat)}")
except (TypeError, ValueError):
return None
if len(pairs) < 2:
return None
return f"LINESTRING({', '.join(pairs)})"
def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]:
"""
Accept any of these shapes coming from n8n and return a flat list of
storm-cell dicts:
* [ { "thunderstorms": [...] } ] (n8n HTTP item wrapper)
* { "thunderstorms": [...] } (single API response)
* [ {storm}, {storm}, ... ] (already flat)
* { "0": {storm}, "1": {storm} } (object-as-array)
"""
if raw is None:
return []
if isinstance(raw, list):
if raw and all(isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) for item in raw):
unwrapped: list[dict[str, Any]] = []
for item in raw:
for key in _STORM_ENVELOPE_KEYS:
if key in item and isinstance(item[key], list):
unwrapped.extend(s for s in item[key] if isinstance(s, dict))
break
return unwrapped
return [s for s in raw if isinstance(s, dict)]
if isinstance(raw, dict):
for key in _STORM_ENVELOPE_KEYS:
if key in raw and isinstance(raw[key], list):
return [s for s in raw[key] if isinstance(s, dict)]
return [v for v in raw.values() if isinstance(v, dict)]
return []
def normalize_storm_records(raw: Any) -> list[dict[str, Any]]:
"""
Translate iklim.co `/v1/thunderstorms/within` records into the shape that
`src.reporting.docx` and `src.visualization.storm_cells` expect.
Required output keys per record:
- cell_polygon_wkt
- effective_time / expire_time / creation_time (ISO 8601 UTC)
- lightning_severity
"""
out: list[dict[str, Any]] = []
for record in _unwrap_storm_envelope(raw):
cell = record.get("cell") if isinstance(record.get("cell"), dict) else {}
polygon = cell.get("polygon") if isinstance(cell.get("polygon"), dict) else {}
wkt = _exterior_to_wkt(polygon.get("exterior")) if polygon else None
if not wkt:
continue
effective = _epoch_ms_to_iso_utc(record.get("eventStartUtcEpoch"))
expire = _epoch_ms_to_iso_utc(record.get("eventEndUtcEpoch"))
created = _epoch_ms_to_iso_utc(record.get("insertedAtEpoch"))
normalized = dict(record)
normalized["cell_polygon_wkt"] = wkt
if effective:
normalized["effective_time"] = effective
if expire:
normalized["expire_time"] = expire
if created:
normalized["creation_time"] = created
severity = record.get("severity") or record.get("lightning_severity")
if severity is not None:
normalized["lightning_severity"] = str(severity).strip().lower()
out.append(normalized)
return out

View File

@ -24,7 +24,7 @@ from src.reporting import docx as docx_module
from src.reporting.docx import create_docx_report from src.reporting.docx import create_docx_report
from src.reporting.filename_utils import slugify_ascii_underscore from src.reporting.filename_utils import slugify_ascii_underscore
from report_service.adapter import apply_farm_config, build_dataframes from report_service.adapter import apply_farm_config, build_dataframes, normalize_storm_records
logging.basicConfig( logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"), level=os.getenv("LOG_LEVEL", "INFO"),
@ -118,7 +118,9 @@ async def generate(request: Request) -> Response:
logger.warning("Payload validation failed: %s", exc) logger.warning("Payload validation failed: %s", exc)
raise HTTPException(status_code=422, detail=str(exc)) from exc raise HTTPException(status_code=422, detail=str(exc)) from exc
storm_records = payload.get("storm_records") or None storm_records = normalize_storm_records(payload.get("storm_records")) or None
if storm_records is not None:
logger.info("Normalized %d storm record(s) for %s", len(storm_records), customer_name)
filename = _build_filename(payload) filename = _build_filename(payload)
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".docx") tmp_fd, tmp_path = tempfile.mkstemp(suffix=".docx")