import os import argparse import yaml import logging from dotenv import load_dotenv from uptime_kuma_api import UptimeKumaApi, MonitorType, NotificationType logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("uk-setup") _root = os.path.join(os.path.dirname(__file__), "..") load_dotenv(os.path.join(_root, ".env")) load_dotenv(os.path.join(_root, ".env.setup")) def format_str(text, env_name, project): if not isinstance(text, str): return text return text.replace("{env}", env_name).replace("{project}", project) def resolve_template(text, suffix, domain): if not isinstance(text, str): return text return text.replace("{suffix}", suffix).replace("{domain}", domain) def find_parent_group(monitor_name, groups, group_map): for g in groups: if monitor_name in g.get("children", []): return group_map.get(g["name"]) return None def find_group_notifications(monitor_name, groups, notification_map): for g in groups: if monitor_name in g.get("children", []): ids = [notification_map[n] for n in g.get("notifications", []) if notification_map.get(n) is not None] return ids or None return None def setup_uptime_kuma(dry_run=False, only=None): env_name = os.getenv("ENV", "test") config_path = os.path.join(os.path.dirname(__file__), "..", "config", "monitors.yml") with open(config_path, "r") as f: config = yaml.safe_load(f) project = config.get("project", "iklim") domain = os.getenv("EXTERNAL_DOMAIN", config.get("domain", {}).get("base", "iklim.co")) suffix = os.getenv("EXTERNAL_SUBDOMAIN_SUFFIX", "") kuma_url = os.getenv("UK_URL", "http://localhost:3001") kuma_user = os.getenv("UK_USER", "admin") kuma_pass = os.getenv("UK_PASS", "admin") api = None if not dry_run: logger.info(f"Connecting to Uptime Kuma at {kuma_url}...") try: api = UptimeKumaApi(kuma_url) api.login(kuma_user, kuma_pass) except Exception as e: logger.error(f"Login failed: {e}") return existing_monitors = {} if api: try: for m in api.get_monitors(): existing_monitors[m['name']] = m except Exception as e: logger.error(f"Failed to get monitors: {e}") # 0. Notification Providers notification_map = {} existing_notifications = {} if api: try: for n in api.get_notifications(): existing_notifications[n['name']] = n except Exception as e: logger.warning(f"Failed to get notifications: {e}") for notif_key, notif_cfg in config.get("notifications", {}).items(): webhook_env = notif_cfg.get("webhook_env") webhook_url = os.getenv(webhook_env, "") if webhook_env else "" notif_name = f"{project}-{notif_key}" logger.info(f"Processing notification: {notif_name}") if not dry_run: if notif_name in existing_notifications: notification_map[notif_key] = existing_notifications[notif_name]['id'] logger.info(f"Notification {notif_name} already exists (id={notification_map[notif_key]})") elif webhook_url: try: res = api.add_notification( type=NotificationType.SLACK, name=notif_name, isDefault=False, slackwebhookURL=webhook_url, applyExisting=False ) notification_map[notif_key] = res.get('id') logger.info(f"Created notification: {notif_name}") except Exception as e: logger.warning(f"Failed to create notification {notif_name}: {e}") else: logger.warning(f"Skipping {notif_name}: env var {webhook_env} is not set") # 1. Groups group_map = {} for g in config.get("groups", []): raw_name = g["name"] formatted_name = f"{project} [{env_name}] {raw_name}" notif_ids = [notification_map[n] for n in g.get("notifications", []) if notification_map.get(n) is not None] logger.info(f"Processing group: {formatted_name}") if not dry_run: if formatted_name not in existing_monitors: logger.info(f"Creating group monitor: {formatted_name}") kwargs = {"type": MonitorType.GROUP, "name": formatted_name} if notif_ids: kwargs["notificationIDList"] = notif_ids res = api.add_monitor(**kwargs) group_map[raw_name] = res['monitorID'] else: group_map[raw_name] = existing_monitors[formatted_name]['id'] tokens = {} new_monitor_ids = {} # m_name -> monitorID for monitors created in this run # 2. Push Monitors for pm in config.get("push_monitors", []): m_name = pm["name"] if only and m_name != only: continue m_interval = pm.get("interval", 60) parent_group_id = find_parent_group(m_name, config.get("groups", []), group_map) notif_ids = find_group_notifications(m_name, config.get("groups", []), notification_map) logger.info(f"Processing push monitor: {m_name}") if not dry_run: if m_name in existing_monitors: logger.info(f"Monitor {m_name} already exists. Updating...") m_id = existing_monitors[m_name]['id'] tokens[m_name] = existing_monitors[m_name]['pushToken'] kwargs = { "interval": m_interval } if parent_group_id: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids try: api.edit_monitor(m_id, **kwargs) except Exception as e: logger.warning(f"Failed to edit push monitor {m_name}: {e}") else: logger.info(f"Creating push monitor: {m_name}") kwargs = { "type": MonitorType.PUSH, "name": m_name, "interval": m_interval, "parent": parent_group_id } if notif_ids: kwargs["notificationIDList"] = notif_ids result = api.add_monitor(**kwargs) new_monitor_ids[m_name] = result['monitorID'] else: tokens[m_name] = "dummy_token_dry_run" # Fetch push tokens for newly created monitors in one batch call. # Calling api.get_monitors() per-monitor races with WebSocket event delivery; # a single call after all creates allows the server state to settle. if new_monitor_ids and api: id_to_name = {v: k for k, v in new_monitor_ids.items()} for m in api.get_monitors(): if m['id'] in id_to_name: m_name = id_to_name[m['id']] tokens[m_name] = m.get('pushToken', '') logger.info(f"Captured push token for {m_name}") missing = [n for n in new_monitor_ids if n not in tokens] if missing: logger.warning(f"Could not capture push token for: {missing}") # 3. HTTP Monitors for hm in config.get("http_monitors", []): m_name = hm["name"] if only and m_name != only: continue url = resolve_template(hm["url"], suffix, domain) interval = hm.get("interval", 60) accepted_statuscodes = hm.get("accepted_statuscodes", ["200"]) parent_group_id = find_parent_group(m_name, config.get("groups", []), group_map) notif_ids = find_group_notifications(m_name, config.get("groups", []), notification_map) logger.info(f"Processing HTTP monitor: {m_name} -> {url}") if not dry_run: if m_name in existing_monitors: logger.info(f"Monitor {m_name} already exists. Updating...") m_id = existing_monitors[m_name]['id'] kwargs = { "type": MonitorType.HTTP, "name": m_name, "url": url, "interval": interval, "accepted_statuscodes": accepted_statuscodes, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids try: api.edit_monitor(m_id, **kwargs) except Exception as e: logger.warning(f"Failed to edit HTTP monitor {m_name}: {e}") else: try: kwargs = { "type": MonitorType.HTTP, "name": m_name, "url": url, "interval": interval, "accepted_statuscodes": accepted_statuscodes, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids api.add_monitor(**kwargs) logger.info(f"Created HTTP monitor: {m_name}") except Exception as e: logger.warning(f"Failed to create HTTP monitor {m_name}: {e}") # 4. DNS Monitors for dm in config.get("dns_monitors", []): m_name = dm["name"] if only and m_name != only: continue hostname = resolve_template(dm["hostname"], suffix, domain) dns_resolve_type = dm.get("dns_resolve_type", "A") interval = dm.get("interval", 60) parent_group_id = find_parent_group(m_name, config.get("groups", []), group_map) notif_ids = find_group_notifications(m_name, config.get("groups", []), notification_map) logger.info(f"Processing DNS monitor: {m_name} -> {hostname}") if not dry_run: if m_name in existing_monitors: logger.info(f"Monitor {m_name} already exists. Updating...") m_id = existing_monitors[m_name]['id'] kwargs = { "type": MonitorType.DNS, "name": m_name, "hostname": hostname, "port": 53, "accepted_statuscodes": ["200-299"], "dns_resolve_server": "1.1.1.1", "dns_resolve_type": dns_resolve_type, "interval": interval, "url": "https://", } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids try: api.edit_monitor(m_id, **kwargs) except Exception as e: logger.warning(f"Failed to edit DNS monitor {m_name}: {e}") else: try: kwargs = { "type": MonitorType.DNS, "name": m_name, "hostname": hostname, "port": 53, "accepted_statuscodes": ["200-299"], "dns_resolve_server": "1.1.1.1", "dns_resolve_type": dns_resolve_type, "interval": interval, "url": "https://", } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids api.add_monitor(**kwargs) logger.info(f"Created DNS monitor: {m_name}") except Exception as e: logger.warning(f"Failed to create DNS monitor {m_name}: {e}") # 5. Ping Monitors (generated from nodes config) ping_cfg = config.get("ping_monitors", {}) ping_interval = ping_cfg.get("interval", 60) ping_retries = ping_cfg.get("max_retries", 1) env_nodes = config.get("nodes", {}).get(env_name, {}) for i, node in enumerate(env_nodes.get("service", []), 1): m_name = f"Ext Ping App{i:02d}" if only and m_name != only: continue ip = node["ip"] parent_group_id = find_parent_group(m_name, config.get("groups", []), group_map) notif_ids = find_group_notifications(m_name, config.get("groups", []), notification_map) logger.info(f"Processing Ping monitor: {m_name} -> {ip}") if not dry_run: if m_name in existing_monitors: logger.info(f"Monitor {m_name} already exists. Updating...") m_id = existing_monitors[m_name]['id'] kwargs = { "type": MonitorType.PING, "name": m_name, "hostname": ip, "interval": ping_interval, "maxretries": ping_retries, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids try: api.edit_monitor(m_id, **kwargs) except Exception as e: logger.warning(f"Failed to edit Ping monitor {m_name}: {e}") else: try: kwargs = { "type": MonitorType.PING, "name": m_name, "hostname": ip, "interval": ping_interval, "maxretries": ping_retries, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids api.add_monitor(**kwargs) logger.info(f"Created Ping monitor: {m_name}") except Exception as e: logger.warning(f"Failed to create Ping monitor {m_name}: {e}") for i, node in enumerate(env_nodes.get("db", []), 1): m_name = f"Ext Ping Db{i:02d}" if only and m_name != only: continue ip = node["ip"] parent_group_id = find_parent_group(m_name, config.get("groups", []), group_map) notif_ids = find_group_notifications(m_name, config.get("groups", []), notification_map) logger.info(f"Processing Ping monitor: {m_name} -> {ip}") if not dry_run: if m_name in existing_monitors: logger.info(f"Monitor {m_name} already exists. Updating...") m_id = existing_monitors[m_name]['id'] kwargs = { "type": MonitorType.PING, "name": m_name, "hostname": ip, "interval": ping_interval, "maxretries": ping_retries, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids try: api.edit_monitor(m_id, **kwargs) except Exception as e: logger.warning(f"Failed to edit Ping monitor {m_name}: {e}") else: try: kwargs = { "type": MonitorType.PING, "name": m_name, "hostname": ip, "interval": ping_interval, "maxretries": ping_retries, } if parent_group_id is not None: kwargs["parent"] = parent_group_id if notif_ids: kwargs["notificationIDList"] = notif_ids api.add_monitor(**kwargs) logger.info(f"Created Ping monitor: {m_name}") except Exception as e: logger.warning(f"Failed to create Ping monitor {m_name}: {e}") # 6. Status Pages if api: existing_pages = {} try: for p in api.get_status_pages(): existing_pages[p['slug']] = p except Exception as e: logger.warning(f"Failed to get status pages: {e}") for sp in config.get("status_pages", []): slug = format_str(sp["slug"], env_name, project) title = format_str(sp["title"], env_name, project) is_public = sp.get("public", False) sp_groups = sp.get("groups", []) logger.info(f"Processing status page: {title} (slug: {slug})") try: if slug not in existing_pages: logger.info(f"Creating status page: {slug}") api.add_status_page(slug, title) # Each monitors.yml group becomes one display section on the status page. # Use group_map (populated during Section 1) to avoid re-fetching monitors; # a fresh get_monitors() call after add_monitor() races with WebSocket delivery. public_group_list = [] for group_raw_name in sp_groups: group_id = group_map.get(group_raw_name) if not group_id: logger.warning(f"Group '{group_raw_name}' not in group_map, skipping in status page") continue public_group_list.append({ "name": group_raw_name, "weight": len(public_group_list) + 1, "monitorList": [{"id": group_id}] }) if public_group_list: api.save_status_page( slug=slug, title=title, publicGroupList=public_group_list, published=is_public ) logger.info(f"Saved status page '{slug}' with {len(public_group_list)} group(s)") except Exception as e: logger.warning(f"Status page ops failed for {slug}: {e}") # 7. Write push tokens to uk_tokens.yml token_file = os.path.join(os.path.dirname(__file__), "..", "config", "generated", "uk_tokens.yml") if not dry_run: if not tokens: logger.warning("No push tokens captured; skipping uk_tokens.yml write so setup reruns next time") else: os.makedirs(os.path.dirname(token_file), exist_ok=True) with open(token_file, "w") as f: yaml.dump(tokens, f) logger.info(f"Saved {len(tokens)} push tokens to {token_file}") else: logger.info(f"[DRY-RUN] Would save {len(tokens)} tokens to {token_file}") if api: api.disconnect() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Setup Uptime Kuma monitors") parser.add_argument("--dry-run", action="store_true", help="Print actions without making changes") parser.add_argument("--only", type=str, help="Only process a specific monitor by name") args = parser.parse_args() setup_uptime_kuma(dry_run=args.dry_run, only=args.only)