From 543c3168420e98af09dbb2501c1af2dc99febd15 Mon Sep 17 00:00:00 2001 From: erdemerikci Date: Wed, 29 Apr 2026 16:09:11 +0300 Subject: [PATCH] Remove n8n_report_branch.json and integrate storm record normalization in report_service. Adapter now includes functions for converting epoch timestamps to ISO format and unwrapping storm data from various API response shapes. Updated main.py to utilize the new normalization function for storm records. --- n8n_report_branch.json | 197 -------------------------------------- report_service/adapter.py | 108 +++++++++++++++++++++ report_service/main.py | 6 +- 3 files changed, 112 insertions(+), 199 deletions(-) delete mode 100644 n8n_report_branch.json diff --git a/n8n_report_branch.json b/n8n_report_branch.json deleted file mode 100644 index 4a799a7..0000000 --- a/n8n_report_branch.json +++ /dev/null @@ -1,197 +0,0 @@ -{ - "name": "Lightning Report Branch (paste into Lightning_Report_Automatic)", - "nodes": [ - { - "parameters": { - "assignments": { - "assignments": [ - { "id": "rpt-cid", "name": "customer_id", "value": "={{ $('Loop Over Items').item.json.id }}", "type": "string" }, - { "id": "rpt-cname", "name": "customer_name", "value": "={{ $('Loop Over Items').item.json.customer_name }}", "type": "string" }, - { "id": "rpt-tz", "name": "timezone", "value": "={{ $('Loop Over Items').item.json.timezone || 'Europe/Istanbul' }}", "type": "string" }, - { "id": "rpt-clat", "name": "centroid_lat", "value": "={{ $('Centroid & Distance Ring calculation').item.json.centroid_latitude }}", "type": "number" }, - { "id": "rpt-clon", "name": "centroid_lon", "value": "={{ $('Centroid & Distance Ring calculation').item.json.centroid_longitude }}", "type": "number" }, - { "id": "rpt-bnd", "name": "boundary_m", "value": "={{ $('Centroid & Distance Ring calculation').item.json.monitoring_boundary_m }}","type": "number" }, - { "id": "rpt-rings", "name": "rings", "value": "={{ $('Centroid & Distance Ring calculation').item.json.rings }}", "type": "object" }, - { "id": "rpt-rcolors", "name": "ring_colors", "value": "={{ [\"#B71C1C\", \"#F94144\", \"#F8961E\", \"#90BE6D\"] }}", "type": "array" }, - { "id": "rpt-ts", "name": "t_start", "value": "={{ $('Logic Gate').item.json.tStart }}", "type": "number" }, - { "id": "rpt-te", "name": "t_end", "value": "={{ $('Logic Gate').item.json.tLast }}", "type": "number" }, - { "id": "rpt-ns", "name": "n_strikes", "value": "={{ $('Logic Gate').item.json.allStrikes.length }}", "type": "number" }, - { "id": "rpt-strikes", "name": "strikes", "value": "={{ $('Logic Gate').item.json.allStrikes }}", "type": "array" }, - { "id": "rpt-turbines", "name": "turbines", "value": "={{ $('Get Customer Wind Turbines').all().map(t => t.json) }}", "type": "array" } - ] - }, - "options": {} - }, - "type": "n8n-nodes-base.set", - "typeVersion": 3.4, - "position": [3440, 480], - "id": "a1a1a1a1-0001-4a01-8a01-000000000001", - "name": "Report: Gather Inputs" - }, - { - "parameters": { - "jsCode": "const crypto = require('crypto');\nconst HMAC_SECRET = 'c88f845bd6d520ded507ef6b02efc223019ccf68f41d9070705712d480ba5166';\nconst URI = '/v1/thunderstorms/within';\n\nconst ctx = $('Report: Gather Inputs').item.json;\nconst auth = $('Restore Credentials').all().pop().json;\n\nif (!ctx.t_start || !ctx.t_end) {\n throw new Error('Missing storm timestamps from Logic Gate.');\n}\n\nconst durationSeconds = Math.max(600, Math.floor((ctx.t_end - ctx.t_start) / 1000));\nconst timestamp = Date.now().toString();\n\nconst bodyPayload = {\n latitude: Number(Number(ctx.centroid_lat).toFixed(6)),\n longitude: Number(Number(ctx.centroid_lon).toFixed(6)),\n radius: parseInt(ctx.boundary_m, 10),\n backwardInterval: durationSeconds,\n endTimeEpoch: Number(ctx.t_end),\n intersectsWith: 'THREAT_POLYGON',\n pageNumber: 0,\n pageSize: 100\n};\n\nconst bodyString = JSON.stringify(bodyPayload);\nconst dataToSign = `POST|${URI}|${timestamp}|${bodyString}`;\nconst signature = crypto.createHmac('sha256', HMAC_SECRET).update(dataToSign).digest('hex').toLowerCase();\n\nreturn [{\n json: {\n requestBody: bodyPayload,\n headers: {\n 'X-Signature': signature,\n 'X-Timestamp': timestamp,\n 'X-Nonce': crypto.randomUUID(),\n 'X-Idempotency-Key': crypto.randomUUID(),\n 'Authorization': 'Bearer ' + auth.accessToken,\n 'Content-Type': 'application/json'\n }\n }\n}];" - }, - "type": "n8n-nodes-base.code", - "typeVersion": 2, - "position": [3680, 480], - "id": "a2a2a2a2-0002-4a02-8a02-000000000002", - "name": "Report: Calc Thunderstorm Headers" - }, - { - "parameters": { - "method": "POST", - "url": "https://api-test.iklim.co/v1/thunderstorms/within", - "sendHeaders": true, - "headerParameters": { - "parameters": [ - { "name": "Authorization", "value": "={{ $json.headers.Authorization }}" }, - { "name": "X-Signature", "value": "={{ $json.headers['X-Signature'] }}" }, - { "name": "X-Timestamp", "value": "={{ $json.headers['X-Timestamp'] }}" }, - { "name": "X-Nonce", "value": "={{ $json.headers['X-Nonce'] }}" }, - { "name": "X-Idempotency-Key", "value": "={{ $json.headers['X-Idempotency-Key'] }}" }, - { "name": "Content-Type", "value": "={{ $json.headers['Content-Type'] }}" }, - { "name": "Accept", "value": "={{ $json.headers['Content-Type'] }}" } - ] - }, - "sendBody": true, - "specifyBody": "json", - "jsonBody": "={{ $json.requestBody }}", - "options": {} - }, - "type": "n8n-nodes-base.httpRequest", - "typeVersion": 4.3, - "position": [3920, 480], - "id": "a3a3a3a3-0003-4a03-8a03-000000000003", - "name": "Report: Fetch Thunderstorms", - "onError": "continueRegularOutput" - }, - { - "parameters": { - "method": "POST", - "url": "=https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={{ $env.GEMINI_API_KEY }}", - "sendBody": true, - "specifyBody": "json", - "jsonBody": "={{ { contents: [ { parts: [ { text: 'Sen bir yildirim risk analisti raportorusun. Asagidaki verileri kullanarak Turkce, 2-3 paragraf, profesyonel bir ozet yaz. Sayilari dogal dilde ver; asiri teknik olma; santral adini kullan; sure, toplam darbe ve yogunluk hakkinda yorum yap.\\n\\nSantral: ' + $('Report: Gather Inputs').item.json.customer_name + '\\nZaman dilimi: ' + $('Report: Gather Inputs').item.json.timezone + '\\nFirtina baslangici (epoch ms): ' + $('Report: Gather Inputs').item.json.t_start + '\\nFirtina bitisi (epoch ms): ' + $('Report: Gather Inputs').item.json.t_end + '\\nToplam yildirim darbesi: ' + $('Report: Gather Inputs').item.json.n_strikes + '\\nIzleme yaricapi (m): ' + $('Report: Gather Inputs').item.json.boundary_m } ] } ] } }}", - "options": {} - }, - "type": "n8n-nodes-base.httpRequest", - "typeVersion": 4.3, - "position": [4160, 480], - "id": "a4a4a4a4-0004-4a04-8a04-000000000004", - "name": "Report: Gemini Commentary", - "onError": "continueRegularOutput" - }, - { - "parameters": { - "assignments": { - "assignments": [ - { "id": "bp-cid", "name": "customer_id", "value": "={{ $('Report: Gather Inputs').item.json.customer_id }}", "type": "string" }, - { "id": "bp-cname", "name": "customer_name", "value": "={{ $('Report: Gather Inputs').item.json.customer_name }}", "type": "string" }, - { "id": "bp-tz", "name": "timezone", "value": "={{ $('Report: Gather Inputs').item.json.timezone }}", "type": "string" }, - { "id": "bp-clat", "name": "centroid_lat", "value": "={{ $('Report: Gather Inputs').item.json.centroid_lat }}", "type": "number" }, - { "id": "bp-clon", "name": "centroid_lon", "value": "={{ $('Report: Gather Inputs').item.json.centroid_lon }}", "type": "number" }, - { "id": "bp-bnd", "name": "boundary_m", "value": "={{ $('Report: Gather Inputs').item.json.boundary_m }}", "type": "number" }, - { "id": "bp-rings", "name": "rings", "value": "={{ $('Report: Gather Inputs').item.json.rings }}", "type": "object" }, - { "id": "bp-rcolors", "name": "ring_colors", "value": "={{ $('Report: Gather Inputs').item.json.ring_colors }}", "type": "array" }, - { "id": "bp-ts", "name": "t_start", "value": "={{ $('Report: Gather Inputs').item.json.t_start }}", "type": "number" }, - { "id": "bp-te", "name": "t_end", "value": "={{ $('Report: Gather Inputs').item.json.t_end }}", "type": "number" }, - { "id": "bp-ns", "name": "n_strikes", "value": "={{ $('Report: Gather Inputs').item.json.n_strikes }}", "type": "number" }, - { "id": "bp-strikes", "name": "strikes", "value": "={{ $('Report: Gather Inputs').item.json.strikes }}", "type": "array" }, - { "id": "bp-turbines", "name": "turbines", "value": "={{ $('Report: Gather Inputs').item.json.turbines }}", "type": "array" }, - { "id": "bp-gem", "name": "gemini_text", "value": "={{ $('Report: Gemini Commentary').item.json?.candidates?.[0]?.content?.parts?.[0]?.text || '' }}", "type": "string" }, - { "id": "bp-storms", "name": "storm_records", "value": "={{ $('Report: Fetch Thunderstorms').item.json?.thunderstorms || $('Report: Fetch Thunderstorms').item.json?.data || [] }}", "type": "array" } - ] - }, - "options": {} - }, - "type": "n8n-nodes-base.set", - "typeVersion": 3.4, - "position": [4320, 480], - "id": "a8a8a8a8-0008-4a08-8a08-000000000008", - "name": "Report: Build Payload" - }, - { - "parameters": { - "method": "POST", - "url": "={{ $env.REPORT_SERVICE_URL || 'http://report-service:8000' }}/generate", - "sendHeaders": true, - "headerParameters": { - "parameters": [ - { "name": "Content-Type", "value": "application/json" }, - { "name": "Accept", "value": "application/vnd.openxmlformats-officedocument.wordprocessingml.document" } - ] - }, - "sendBody": true, - "specifyBody": "json", - "jsonBody": "={{ $json }}", - "options": { - "timeout": 300000, - "response": { - "response": { - "responseFormat": "file", - "outputPropertyName": "report" - } - } - } - }, - "type": "n8n-nodes-base.httpRequest", - "typeVersion": 4.3, - "position": [4560, 480], - "id": "a5a5a5a5-0005-4a05-8a05-000000000005", - "name": "Report: Generate DOCX" - }, - { - "parameters": { - "resource": "file", - "operation": "upload", - "binaryData": true, - "binaryPropertyName": "report", - "channelId": { - "__rl": true, - "value": "REPLACE_WITH_USER_ID", - "mode": "id", - "cachedResultName": "DM target user" - }, - "options": { - "fileName": "={{ $binary.report.fileName || ($('Report: Build Payload').item.json.customer_name + '_report.docx') }}", - "initialComment": "={{ '⚡ ' + $('Report: Build Payload').item.json.customer_name + ' — yeni firtina raporu (' + $('Report: Build Payload').item.json.n_strikes + ' darbe) — rapor ekte' }}" - } - }, - "type": "n8n-nodes-base.slack", - "typeVersion": 2.4, - "position": [4800, 480], - "id": "a6a6a6a6-0006-4a06-8a06-000000000006", - "name": "Report: Send to User", - "credentials": { - "slackApi": { - "id": "OKgM8VkM05pJl9kU", - "name": "Tarla Slack Account" - } - } - } - ], - "pinData": {}, - "connections": { - "Report: Gather Inputs": { - "main": [[{ "node": "Report: Calc Thunderstorm Headers", "type": "main", "index": 0 }]] - }, - "Report: Calc Thunderstorm Headers": { - "main": [[{ "node": "Report: Fetch Thunderstorms", "type": "main", "index": 0 }]] - }, - "Report: Fetch Thunderstorms": { - "main": [[{ "node": "Report: Gemini Commentary", "type": "main", "index": 0 }]] - }, - "Report: Gemini Commentary": { - "main": [[{ "node": "Report: Build Payload", "type": "main", "index": 0 }]] - }, - "Report: Build Payload": { - "main": [[{ "node": "Report: Generate DOCX", "type": "main", "index": 0 }]] - }, - "Report: Generate DOCX": { - "main": [[{ "node": "Report: Send to User", "type": "main", "index": 0 }]] - } - }, - "active": false, - "settings": { "executionOrder": "v1" } -} diff --git a/report_service/adapter.py b/report_service/adapter.py index 8d401c9..f9a8532 100644 --- a/report_service/adapter.py +++ b/report_service/adapter.py @@ -6,12 +6,15 @@ Keeping this isolated so the rest of the codebase never learns about n8n. """ from __future__ import annotations +from datetime import datetime, timezone from typing import Any import pandas as pd from src.config import config +_STORM_ENVELOPE_KEYS = ("thunderstorms", "cells", "storms", "data", "items") + _LIGHTNING_COLUMN_ALIASES: dict[str, list[str]] = { "lat": ["lat", "latitude"], @@ -198,3 +201,108 @@ def build_dataframes(payload: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFram turbine_df = _build_turbine_df(payload.get("turbines") or []) lightning_df = _build_lightning_df(payload.get("strikes") or [], timezone_name) return turbine_df, lightning_df + + +def _epoch_ms_to_iso_utc(epoch_ms: Any) -> str | None: + if epoch_ms in (None, "", 0): + return None + try: + return ( + datetime.fromtimestamp(int(epoch_ms) / 1000, tz=timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%SZ") + ) + except (TypeError, ValueError, OSError): + return None + + +def _exterior_to_wkt(exterior: Any) -> str | None: + """Convert iklim.co `polygon.exterior` array into a WKT LINESTRING.""" + if not isinstance(exterior, list) or not exterior: + return None + pairs: list[str] = [] + for point in exterior: + if not isinstance(point, dict): + return None + lng = point.get("lng") if "lng" in point else point.get("longitude") + lat = point.get("lat") if "lat" in point else point.get("latitude") + if lng is None or lat is None: + return None + try: + pairs.append(f"{float(lng)} {float(lat)}") + except (TypeError, ValueError): + return None + if len(pairs) < 2: + return None + return f"LINESTRING({', '.join(pairs)})" + + +def _unwrap_storm_envelope(raw: Any) -> list[dict[str, Any]]: + """ + Accept any of these shapes coming from n8n and return a flat list of + storm-cell dicts: + + * [ { "thunderstorms": [...] } ] (n8n HTTP item wrapper) + * { "thunderstorms": [...] } (single API response) + * [ {storm}, {storm}, ... ] (already flat) + * { "0": {storm}, "1": {storm} } (object-as-array) + """ + if raw is None: + return [] + + if isinstance(raw, list): + if raw and all(isinstance(item, dict) and any(k in item for k in _STORM_ENVELOPE_KEYS) for item in raw): + unwrapped: list[dict[str, Any]] = [] + for item in raw: + for key in _STORM_ENVELOPE_KEYS: + if key in item and isinstance(item[key], list): + unwrapped.extend(s for s in item[key] if isinstance(s, dict)) + break + return unwrapped + return [s for s in raw if isinstance(s, dict)] + + if isinstance(raw, dict): + for key in _STORM_ENVELOPE_KEYS: + if key in raw and isinstance(raw[key], list): + return [s for s in raw[key] if isinstance(s, dict)] + return [v for v in raw.values() if isinstance(v, dict)] + + return [] + + +def normalize_storm_records(raw: Any) -> list[dict[str, Any]]: + """ + Translate iklim.co `/v1/thunderstorms/within` records into the shape that + `src.reporting.docx` and `src.visualization.storm_cells` expect. + + Required output keys per record: + - cell_polygon_wkt + - effective_time / expire_time / creation_time (ISO 8601 UTC) + - lightning_severity + """ + out: list[dict[str, Any]] = [] + for record in _unwrap_storm_envelope(raw): + cell = record.get("cell") if isinstance(record.get("cell"), dict) else {} + polygon = cell.get("polygon") if isinstance(cell.get("polygon"), dict) else {} + wkt = _exterior_to_wkt(polygon.get("exterior")) if polygon else None + if not wkt: + continue + + effective = _epoch_ms_to_iso_utc(record.get("eventStartUtcEpoch")) + expire = _epoch_ms_to_iso_utc(record.get("eventEndUtcEpoch")) + created = _epoch_ms_to_iso_utc(record.get("insertedAtEpoch")) + + normalized = dict(record) + normalized["cell_polygon_wkt"] = wkt + if effective: + normalized["effective_time"] = effective + if expire: + normalized["expire_time"] = expire + if created: + normalized["creation_time"] = created + + severity = record.get("severity") or record.get("lightning_severity") + if severity is not None: + normalized["lightning_severity"] = str(severity).strip().lower() + + out.append(normalized) + return out diff --git a/report_service/main.py b/report_service/main.py index 1f18bd9..b0a51c4 100644 --- a/report_service/main.py +++ b/report_service/main.py @@ -24,7 +24,7 @@ from src.reporting import docx as docx_module from src.reporting.docx import create_docx_report from src.reporting.filename_utils import slugify_ascii_underscore -from report_service.adapter import apply_farm_config, build_dataframes +from report_service.adapter import apply_farm_config, build_dataframes, normalize_storm_records logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), @@ -118,7 +118,9 @@ async def generate(request: Request) -> Response: logger.warning("Payload validation failed: %s", exc) raise HTTPException(status_code=422, detail=str(exc)) from exc - storm_records = payload.get("storm_records") or None + storm_records = normalize_storm_records(payload.get("storm_records")) or None + if storm_records is not None: + logger.info("Normalized %d storm record(s) for %s", len(storm_records), customer_name) filename = _build_filename(payload) tmp_fd, tmp_path = tempfile.mkstemp(suffix=".docx")