333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""
|
|
FastAPI microservice that wraps `create_docx_report()` for use by n8n.
|
|
|
|
Endpoints:
|
|
- GET /health liveness probe
|
|
- POST /generate accept payload JSON and return a DOCX (sync)
|
|
- POST /generate/async start background job, return job_id immediately
|
|
- GET /generate/async/{job_id} poll job status
|
|
- GET /generate/async/{job_id}/download download completed DOCX
|
|
|
|
Run locally:
|
|
uvicorn report_service.main:app --host 0.0.0.0 --port 8000
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hmac
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import threading
|
|
import uuid
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, FastAPI, Header, HTTPException, Request
|
|
from fastapi.responses import JSONResponse, Response
|
|
|
|
from src.reporting import docx as docx_module
|
|
from src.reporting.docx import create_docx_report
|
|
from src.reporting.filename_utils import slugify_ascii_underscore
|
|
|
|
from report_service.adapter import apply_farm_config, build_dataframes, normalize_storm_records
|
|
|
|
logging.basicConfig(
|
|
level=os.getenv("LOG_LEVEL", "INFO"),
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger("report_service")
|
|
|
|
app = FastAPI(title="Lightning Report Service", version="1.0.0")
|
|
|
|
DOCX_MIME = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETE = "complete"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class ReportJob:
|
|
status: JobStatus
|
|
filename: str | None = None
|
|
path: str | None = None
|
|
error: str | None = None
|
|
customer_name: str | None = None
|
|
n_strikes: int = 0
|
|
|
|
|
|
_jobs: dict[str, ReportJob] = {}
|
|
_jobs_lock = threading.Lock()
|
|
|
|
|
|
def require_report_token(x_report_token: str | None = Header(default=None)) -> None:
|
|
"""
|
|
Shared-secret gate for the public `/generate` endpoint.
|
|
|
|
Fails closed: if `REPORT_SERVICE_TOKEN` is unset in the environment the
|
|
service refuses every request rather than silently serving open traffic.
|
|
"""
|
|
expected = os.getenv("REPORT_SERVICE_TOKEN")
|
|
if not expected:
|
|
logger.error("REPORT_SERVICE_TOKEN is not configured; refusing request")
|
|
raise HTTPException(status_code=503, detail="Service is not configured")
|
|
if not x_report_token or not hmac.compare_digest(x_report_token, expected):
|
|
raise HTTPException(status_code=401, detail="Invalid or missing X-Report-Token")
|
|
|
|
|
|
@contextmanager
|
|
def _override_gemini_commentary(override_text: str | None):
|
|
"""
|
|
If n8n already called Gemini and forwarded the text, short-circuit
|
|
`generate_gemini_paragraph` so the downstream report uses it verbatim.
|
|
|
|
Restores the original function on exit even if the request fails.
|
|
"""
|
|
if not override_text:
|
|
yield
|
|
return
|
|
|
|
original = docx_module.generate_gemini_paragraph
|
|
docx_module.generate_gemini_paragraph = lambda _ctx, api_key=None: override_text
|
|
try:
|
|
yield
|
|
finally:
|
|
docx_module.generate_gemini_paragraph = original
|
|
|
|
|
|
def _build_filename(payload: dict[str, Any]) -> str:
|
|
safe_name = slugify_ascii_underscore(payload.get("customer_name") or "report")
|
|
from src.config import config
|
|
|
|
start = (config.analysis_start_date or "").replace(" ", "_").replace(":", "").replace("-", "")
|
|
end = (config.analysis_end_date or "").replace(" ", "_").replace(":", "").replace("-", "")
|
|
parts = [safe_name]
|
|
if start:
|
|
parts.append(start)
|
|
if end:
|
|
parts.append(end)
|
|
parts.append("report.docx")
|
|
return "_".join(parts)
|
|
|
|
|
|
def _pct_encode_utf8(text: str) -> str:
|
|
return "".join(f"%{b:02X}" for b in text.encode("utf-8"))
|
|
|
|
|
|
def _latin1_header_values(headers: dict[str, str]) -> dict[str, str]:
|
|
return {
|
|
k: v.encode("latin-1", errors="replace").decode("latin-1") for k, v in headers.items()
|
|
}
|
|
|
|
|
|
def _ascii_attachment_filename_from_stem(stem: str) -> str:
|
|
slug = slugify_ascii_underscore(stem)
|
|
safe = slug.encode("ascii", "ignore").decode("ascii").strip("._-") or "report"
|
|
return f"{safe}.docx"
|
|
|
|
|
|
def _content_disposition_attachment(filename: str) -> str:
|
|
lower = filename.lower()
|
|
stem = filename[: -len(".docx")] if lower.endswith(".docx") else filename
|
|
ascii_fn = _ascii_attachment_filename_from_stem(stem)
|
|
ascii_fn = "".join(ch for ch in ascii_fn if ord(ch) < 128) or "report.docx"
|
|
encoded = _pct_encode_utf8(filename)
|
|
return f'attachment; filename="{ascii_fn}"; filename*=UTF-8\'\'{encoded}'
|
|
|
|
|
|
async def _read_payload(request: Request) -> dict[str, Any]:
|
|
try:
|
|
payload: dict[str, Any] = await request.json()
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=f"Invalid JSON body: {exc}") from exc
|
|
|
|
if not isinstance(payload, dict):
|
|
raise HTTPException(status_code=400, detail="Request body must be a JSON object")
|
|
return payload
|
|
|
|
|
|
def _build_docx_file(payload: dict[str, Any]) -> tuple[str, str, int]:
|
|
customer_name = payload.get("customer_name") or "<unknown>"
|
|
n_strikes = int(payload.get("n_strikes") or 0)
|
|
logger.info(
|
|
"Generating report for customer=%s language=%s n_strikes=%s n_turbines=%s",
|
|
customer_name,
|
|
payload.get("language") or "en",
|
|
n_strikes,
|
|
len(payload.get("turbines") or []),
|
|
)
|
|
|
|
apply_farm_config(payload)
|
|
turbine_df, lightning_df = build_dataframes(payload)
|
|
storm_records = normalize_storm_records(payload.get("storm_records")) or None
|
|
if storm_records is not None:
|
|
logger.info("Normalized %d storm record(s) for %s", len(storm_records), customer_name)
|
|
|
|
filename = _build_filename(payload)
|
|
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".docx")
|
|
os.close(tmp_fd)
|
|
|
|
try:
|
|
with _override_gemini_commentary(payload.get("gemini_text")):
|
|
create_docx_report(
|
|
tmp_path,
|
|
turbine_df,
|
|
lightning_df,
|
|
storm_data_path=None,
|
|
storm_data_records=storm_records,
|
|
)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
return tmp_path, filename, n_strikes
|
|
|
|
|
|
def _docx_response(data: bytes, filename: str, customer_name: str, n_strikes: int) -> Response:
|
|
stem = filename[: -len(".docx")] if filename.lower().endswith(".docx") else filename
|
|
ascii_filename = _ascii_attachment_filename_from_stem(stem)
|
|
ascii_filename = "".join(ch for ch in ascii_filename if ord(ch) < 128) or "report.docx"
|
|
customer_hdr = _pct_encode_utf8(str(customer_name))
|
|
hdrs = _latin1_header_values(
|
|
{
|
|
"Content-Disposition": _content_disposition_attachment(filename),
|
|
"X-Report-Filename": ascii_filename,
|
|
"X-Report-Customer": customer_hdr,
|
|
"X-Report-Strikes": str(n_strikes),
|
|
}
|
|
)
|
|
return Response(content=data, media_type=DOCX_MIME, headers=hdrs)
|
|
|
|
|
|
def _run_report_job(job_id: str, payload: dict[str, Any]) -> None:
|
|
customer_name = payload.get("customer_name") or "<unknown>"
|
|
with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if job is None:
|
|
return
|
|
job.status = JobStatus.RUNNING
|
|
|
|
try:
|
|
tmp_path, filename, n_strikes = _build_docx_file(payload)
|
|
with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if job is None:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
return
|
|
job.status = JobStatus.COMPLETE
|
|
job.filename = filename
|
|
job.path = tmp_path
|
|
job.customer_name = str(customer_name)
|
|
job.n_strikes = n_strikes
|
|
logger.info("Async job %s completed: %s for %s", job_id, filename, customer_name)
|
|
except ValueError as exc:
|
|
logger.warning("Async job %s payload validation failed: %s", job_id, exc)
|
|
with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if job is not None:
|
|
job.status = JobStatus.FAILED
|
|
job.error = str(exc)
|
|
except Exception as exc:
|
|
logger.exception("Async job %s failed for %s", job_id, customer_name)
|
|
with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if job is not None:
|
|
job.status = JobStatus.FAILED
|
|
job.error = f"Report generation failed: {exc}"
|
|
|
|
|
|
def _get_job_or_404(job_id: str) -> ReportJob:
|
|
with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail="Report job not found")
|
|
return job
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> JSONResponse:
|
|
return JSONResponse({"ok": True, "service": "lightning-report", "version": app.version})
|
|
|
|
|
|
@app.post("/generate", dependencies=[Depends(require_report_token)])
|
|
async def generate(request: Request) -> Response:
|
|
payload = await _read_payload(request)
|
|
customer_name = payload.get("customer_name") or "<unknown>"
|
|
|
|
try:
|
|
tmp_path, filename, n_strikes = _build_docx_file(payload)
|
|
with open(tmp_path, "rb") as fh:
|
|
data = fh.read()
|
|
except ValueError as exc:
|
|
logger.warning("Payload validation failed: %s", exc)
|
|
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
|
except Exception as exc:
|
|
logger.exception("Report generation failed for %s", customer_name)
|
|
raise HTTPException(status_code=500, detail=f"Report generation failed: {exc}") from exc
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except (OSError, UnboundLocalError):
|
|
pass
|
|
|
|
logger.info("Generated %s (%d bytes) for %s", filename, len(data), customer_name)
|
|
return _docx_response(data, filename, str(customer_name), n_strikes)
|
|
|
|
|
|
@app.post("/generate/async", dependencies=[Depends(require_report_token)])
|
|
async def generate_async(request: Request) -> JSONResponse:
|
|
payload = await _read_payload(request)
|
|
job_id = str(uuid.uuid4())
|
|
with _jobs_lock:
|
|
_jobs[job_id] = ReportJob(status=JobStatus.PENDING, customer_name=str(payload.get("customer_name") or ""))
|
|
|
|
thread = threading.Thread(target=_run_report_job, args=(job_id, payload), daemon=True)
|
|
thread.start()
|
|
logger.info("Queued async report job %s for %s", job_id, payload.get("customer_name"))
|
|
return JSONResponse({"job_id": job_id, "status": JobStatus.PENDING.value})
|
|
|
|
|
|
@app.get("/generate/async/{job_id}", dependencies=[Depends(require_report_token)])
|
|
async def generate_async_status(job_id: str) -> JSONResponse:
|
|
job = _get_job_or_404(job_id)
|
|
body: dict[str, Any] = {"job_id": job_id, "status": job.status.value}
|
|
if job.status == JobStatus.COMPLETE and job.filename:
|
|
body["filename"] = job.filename
|
|
if job.status == JobStatus.FAILED and job.error:
|
|
body["error"] = job.error
|
|
return JSONResponse(body)
|
|
|
|
|
|
@app.get("/generate/async/{job_id}/download", dependencies=[Depends(require_report_token)])
|
|
async def generate_async_download(job_id: str) -> Response:
|
|
job = _get_job_or_404(job_id)
|
|
if job.status != JobStatus.COMPLETE or not job.path:
|
|
raise HTTPException(status_code=409, detail=f"Report job is not complete (status={job.status.value})")
|
|
|
|
try:
|
|
with open(job.path, "rb") as fh:
|
|
data = fh.read()
|
|
finally:
|
|
try:
|
|
os.unlink(job.path)
|
|
except OSError:
|
|
pass
|
|
with _jobs_lock:
|
|
_jobs.pop(job_id, None)
|
|
|
|
filename = job.filename or "report.docx"
|
|
customer_name = job.customer_name or "<unknown>"
|
|
logger.info("Downloaded async job %s: %s (%d bytes)", job_id, filename, len(data))
|
|
return _docx_response(data, filename, customer_name, job.n_strikes)
|