import json import pandas as pd import numpy as np from datetime import datetime from typing import Union, Dict, Any, Optional, List from zoneinfo import ZoneInfo import logging import re # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def format_date_ddmmyyyy(dt: datetime) -> str: return dt.strftime('%d-%m-%Y') def format_datetime_ddmmyyyy_hhmmss(dt: datetime) -> str: return dt.strftime('%d-%m-%Y %H:%M:%S') def format_datetime_ddmmyyyy_hhmm(dt: datetime) -> str: return dt.strftime('%d-%m-%Y %H:%M') def get_utc_offset_label(timezone_name: Optional[str]) -> Optional[str]: if not timezone_name: return None try: tz = ZoneInfo(timezone_name) dt = datetime.now(tz) offset = dt.utcoffset() if offset is None: return None total_seconds = int(offset.total_seconds()) hours = total_seconds // 3600 if hours >= 0: return f"UTC+{hours}" return f"UTC{hours}" except Exception: return None def now_in_timezone(timezone_name: Optional[str]) -> datetime: if not timezone_name: return datetime.now() try: return datetime.now(ZoneInfo(timezone_name)) except Exception: return datetime.now() def format_datetime_to_local_display(value: Optional[str], timezone_name: Optional[str] = None) -> str: if not value or str(value).strip() == '' or str(value).strip().upper() == 'N/A': return 'N/A' s = str(value).strip() try: ts = pd.to_datetime(s, utc=True) if timezone_name: ts = ts.tz_convert(ZoneInfo(timezone_name)).tz_localize(None) else: ts = ts.to_pydatetime().replace(tzinfo=None) dt = ts.to_pydatetime() if hasattr(ts, 'to_pydatetime') else ts return dt.strftime('%d-%m-%Y %H:%M') except Exception: return s[:19] if len(s) >= 19 else s def parse_period_string_to_datetime(value: Optional[str]) -> Optional[datetime]: if value is None: return None value_str = str(value).strip() if not value_str: return None try: if re.fullmatch(r"\d{2}-\d{2}-\d{4}", value_str): return datetime.strptime(value_str, '%d-%m-%Y') if re.match(r"\d{2}-\d{2}-\d{4}\s+\d", value_str): try: return datetime.strptime(value_str[:19], '%d-%m-%Y %H:%M:%S') except ValueError: try: return datetime.strptime(value_str[:16], '%d-%m-%Y %H:%M') except ValueError: pass ts = pd.to_datetime(value_str, errors='raise') if isinstance(ts, pd.Timestamp): if ts.tzinfo is not None: ts = ts.tz_convert('UTC').tz_localize(None) return ts.to_pydatetime() except Exception as e: logger.debug(f"parse_period_string_to_datetime failed for {value_str}: {e}") return None return None def normalize_local_time_to_timezone( df: pd.DataFrame, column: str, timezone_name: Optional[str], ) -> pd.DataFrame: if len(df) == 0 or not timezone_name: return df tz = ZoneInfo(timezone_name) df = df.copy() df[column] = pd.to_datetime(df[column], utc=True, errors='coerce') df = df[~df[column].isna()] if len(df) == 0: return df df[column] = df[column].dt.tz_convert(tz).dt.tz_localize(None) return df def format_period_display_for_report(start_value: Optional[str], end_value: Optional[str]) -> tuple[str, str]: def _format_one(val: Optional[str]) -> str: if not val or not str(val).strip(): return "" s = str(val).strip() try: if re.fullmatch(r"\d{2}-\d{2}-\d{4}", s): return s if "T" in s or "Z" in s: ts = pd.to_datetime(s, utc=True) local_dt = ts.to_pydatetime().astimezone(None) return local_dt.strftime('%d-%m-%Y %H:%M') ts = pd.to_datetime(s, errors='raise') if isinstance(ts, pd.Timestamp): if ts.tzinfo is not None: ts = ts.tz_localize(None) dt = ts.to_pydatetime() return dt.strftime('%d-%m-%Y %H:%M') return s except Exception: return s start_display = _format_one(start_value) if start_value else "" end_display = _format_one(end_value) if end_value else "" return start_display, end_display def get_analysis_radius_m() -> int: from .config import config rings = config.distance_rings or [] outermost_ring = int(max(rings)) if rings else 0 boundary = config.analysis_boundary_m if isinstance(boundary, (int, float)) and boundary > 0: if outermost_ring > 0: return min(int(boundary), outermost_ring) return int(boundary) return outermost_ring def get_storm_monitoring_radius_km() -> float: """Outer radius used when filtering thunderstorm cells for the report.""" from .config import config boundary = config.analysis_boundary_m if isinstance(boundary, (int, float)) and boundary > 0: return float(boundary) / 1000.0 rings = config.distance_rings or [] if rings: return float(max(rings)) / 1000.0 return 50.0 def get_turbine_color_by_fixed_intervals(risk_log_value: float) -> str: """ Get turbine color based on fixed risk score intervals. Uses consistent color coding across all groups and tables. Args: risk_log_value: Log-transformed risk score Returns: Color string for the turbine """ # Define fixed risk intervals and corresponding colors # Using the new color palette: F94144, F3722C, F8961E, F9C74F, 90BE6D, 43AA8B, 577590 if risk_log_value < 0.1: return '#577590' elif risk_log_value < 0.2: return '#43AA8B' elif risk_log_value < 0.4: return '#90BE6D' elif risk_log_value < 0.6: return '#F9C74F' elif risk_log_value < 0.8: return '#F8961E' elif risk_log_value < 1.0: return '#F3722C' elif risk_log_value < 1.2: return '#F94144' elif risk_log_value < 1.4: return '#D32F2F' else: return '#B71C1C' def get_risk_definition_by_fixed_intervals( risk_log_value: float, language: str | None = None, ) -> str: from src.reporting.strings import get_risk_definition, get_report_language lang = language if language is not None else get_report_language() return get_risk_definition(risk_log_value, lang) def get_turbine_colors_by_fixed_intervals(risk_log_values: List[float]) -> List[str]: """ Get turbine colors for a list of risk scores based on fixed intervals. Args: risk_log_values: List of log-transformed risk scores Returns: List of color strings for the turbines """ return [get_turbine_color_by_fixed_intervals(risk_log) for risk_log in risk_log_values] def safe_datetime_conversion(time_str: str) -> Optional[datetime]: """ Safely convert string to datetime with error handling. Args: time_str: String representation of datetime Returns: datetime object or None if conversion fails """ if not time_str or pd.isna(time_str): return None # Try different datetime formats formats = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%d' ] for fmt in formats: try: return datetime.strptime(time_str[:19], fmt) except ValueError: continue # Try pandas parsing as fallback parsed = None try: parsed = pd.to_datetime(time_str, errors='coerce') except Exception: parsed = None if isinstance(parsed, pd.Timestamp) and not pd.isna(parsed): return parsed.to_pydatetime() if isinstance(time_str, datetime): return time_str logger.error(f"Failed to convert datetime: {time_str}") return None def load_json_data(file_path: str) -> Dict[str, Any]: """ Generic JSON loader with error handling. Args: file_path: Path to JSON file Returns: Dictionary containing JSON data Raises: FileNotFoundError: If file doesn't exist json.JSONDecodeError: If JSON is invalid """ try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"Successfully loaded JSON data from {file_path}") return data except FileNotFoundError: logger.error(f"File not found: {file_path}") raise except json.JSONDecodeError as e: logger.error(f"Invalid JSON in {file_path}: {e}") raise except Exception as e: logger.error(f"Unexpected error loading {file_path}: {e}") raise def filter_lightning_data_by_date_range(lightning_df: pd.DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: """ Filter lightning data by date range. Args: lightning_df: DataFrame containing lightning data with 'local_time' column start_date: Start date in format 'DD-MM-YYYY' or None for no filtering end_date: End date in format 'DD-MM-YYYY' or None for no filtering Returns: Filtered DataFrame containing only lightning data within the specified date range """ if start_date is None and end_date is None: return lightning_df def _parse_flexible_datetime(value: Optional[str], is_end: bool = False) -> Optional[datetime]: if value is None: return None value_str = str(value).strip() if not value_str: return None try: if re.fullmatch(r"\d{2}-\d{2}-\d{4} \d{2}:\d{2}", value_str): dt = datetime.strptime(value_str, '%d-%m-%Y %H:%M') if is_end: dt = dt.replace(second=59) return dt if re.fullmatch(r"\d{2}-\d{2}-\d{4}", value_str): dt = datetime.strptime(value_str, '%d-%m-%Y') if is_end: dt = dt.replace(hour=23, minute=59, second=59) return dt ts = pd.to_datetime(value_str, errors='raise') if isinstance(ts, pd.Timestamp): if ts.tzinfo is not None: ts = ts.tz_convert('UTC').tz_localize(None) return ts.to_pydatetime() except Exception as e: logger.error(f"Invalid datetime value: {value_str}. Error: {e}") return None return None df = lightning_df.copy() if df['local_time'].dtype == 'object': df['local_time'] = pd.to_datetime(df['local_time']) if df['local_time'].dt.tz is not None: from src.config import config tz_name = getattr(config, 'timezone', None) or 'UTC' df['local_time'] = df['local_time'].dt.tz_convert(tz_name).dt.tz_localize(None) start_dt = _parse_flexible_datetime(start_date, is_end=False) end_dt = _parse_flexible_datetime(end_date, is_end=True) if start_date and start_dt is None: logger.error(f"Invalid start_date value: {start_date}. Expected 'DD-MM-YYYY' or ISO datetime string.") return lightning_df if end_date and end_dt is None: logger.error(f"Invalid end_date value: {end_date}. Expected 'DD-MM-YYYY' or ISO datetime string.") return lightning_df # Apply date filtering if start_dt and end_dt: mask = (df['local_time'] >= start_dt) & (df['local_time'] <= end_dt) filtered_df = df[mask] logger.info(f"Filtered lightning data from {len(lightning_df)} to {len(filtered_df)} records ({start_date} to {end_date})") return filtered_df if start_dt: mask = df['local_time'] >= start_dt filtered_df = df[mask] logger.info(f"Filtered lightning data from {len(lightning_df)} to {len(filtered_df)} records (from {start_date})") return filtered_df if end_dt: mask = df['local_time'] <= end_dt filtered_df = df[mask] logger.info(f"Filtered lightning data from {len(lightning_df)} to {len(filtered_df)} records (until {end_date})") return filtered_df return df def validate_lightning_data(df: pd.DataFrame) -> bool: """ Validate lightning data structure and content. Args: df: Lightning DataFrame Returns: True if valid, False otherwise """ required_columns = ['lat', 'lng', 'current', 'p_type', 'local_time'] # Handle empty dataset gracefully if len(df) == 0: logger.warning("Lightning dataset is empty - this is acceptable for analysis") return True # Check required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logger.error(f"Missing required columns: {missing_columns}") return False # Check data types if not pd.api.types.is_numeric_dtype(df['lat']): logger.error("Latitude column must be numeric") return False if not pd.api.types.is_numeric_dtype(df['lng']): logger.error("Longitude column must be numeric") return False if not pd.api.types.is_numeric_dtype(df['current']): logger.error("Current column must be numeric") return False # Check coordinate ranges if not (df['lat'].between(-90, 90).all()): logger.error("Latitude values must be between -90 and 90") return False if not (df['lng'].between(-180, 180).all()): logger.error("Longitude values must be between -180 and 180") return False # Check p_type values valid_p_types = ['0', '1', 0, 1] invalid_p_types = df[~df['p_type'].astype(str).isin(['0', '1'])] if len(invalid_p_types) > 0: logger.warning(f"Found {len(invalid_p_types)} invalid p_type values") logger.info(f"Lightning data validation passed: {len(df)} records") return True def validate_turbine_data(df: pd.DataFrame) -> bool: """ Validate turbine data structure and content. Args: df: Turbine DataFrame Returns: True if valid, False otherwise """ required_columns = ['lat', 'lng', 'name'] # Check required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logger.error(f"Missing required columns: {missing_columns}") return False # Check data types if not pd.api.types.is_numeric_dtype(df['lat']): logger.error("Latitude column must be numeric") return False if not pd.api.types.is_numeric_dtype(df['lng']): logger.error("Longitude column must be numeric") return False # Check coordinate ranges if not (df['lat'].between(-90, 90).all()): logger.error("Latitude values must be between -90 and 90") return False if not (df['lng'].between(-180, 180).all()): logger.error("Longitude values must be between -180 and 180") return False logger.info(f"Turbine data validation passed: {len(df)} records") return True def ensure_datetime_column(df: pd.DataFrame, column: str) -> pd.DataFrame: """ Ensure a column contains datetime objects. Args: df: DataFrame column: Column name to convert Returns: DataFrame with converted datetime column """ # Handle empty DataFrame if len(df) == 0: return df if df[column].dtype == 'object': df = df.copy() df[column] = pd.to_datetime(df[column], errors='coerce') logger.info(f"Converted {column} to datetime") return df