apiVersion: batch/v1 kind: CronJob metadata: name: nxtgauge-openobserve-k8s-monitor namespace: nxtgauge spec: schedule: "*/1 * * * *" concurrencyPolicy: Forbid successfulJobsHistoryLimit: 1 failedJobsHistoryLimit: 3 jobTemplate: spec: template: spec: serviceAccountName: nxtgauge-openobserve-k8s-monitor restartPolicy: OnFailure containers: - name: k8s-monitor image: python:3.12-alpine imagePullPolicy: IfNotPresent envFrom: - secretRef: name: nxtgauge-openobserve-endpoint-monitor-secret command: ["python", "-c"] args: - | import datetime import json import os import ssl import time import urllib.error import urllib.request token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" with open(token_path, "r", encoding="utf-8") as f: token = f.read().strip() kube_ctx = ssl.create_default_context(cafile=ca_path) kube_api = "https://kubernetes.default.svc" def kube_get(path: str): req = urllib.request.Request( kube_api + path, headers={"Authorization": f"Bearer {token}"}, ) with urllib.request.urlopen(req, context=kube_ctx, timeout=20) as resp: return json.loads(resp.read().decode("utf-8")) def check_url(name: str, url: str): start = time.time() status = 0 ok = False err = "" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=15) as resp: status = int(getattr(resp, "status", 0) or 0) ok = 200 <= status < 400 except urllib.error.HTTPError as e: status = int(getattr(e, "code", 0) or 0) ok = 200 <= status < 400 err = str(e) except Exception as e: err = str(e) if name == "registry-svc" and status in (200, 401): ok = True latency_ms = int((time.time() - start) * 1000) return { "kind": "endpoint", "endpoint": name, "url": url, "status_code": status, "ok": ok, "latency_ms": latency_ms, "error": err, } now = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" records = [] nodes = kube_get("/api/v1/nodes").get("items", []) ready_count = 0 for n in nodes: conds = {c.get("type"): c.get("status") for c in n.get("status", {}).get("conditions", [])} ready = conds.get("Ready") == "True" if ready: ready_count += 1 records.append( { "kind": "node", "node": n.get("metadata", {}).get("name", "unknown"), "ready": ready, "memory_pressure": conds.get("MemoryPressure"), "disk_pressure": conds.get("DiskPressure"), "pid_pressure": conds.get("PIDPressure"), "network_unavailable": conds.get("NetworkUnavailable"), "checked_at": now, } ) pod_issues = 0 pods = kube_get("/api/v1/pods").get("items", []) for p in pods: ns = p.get("metadata", {}).get("namespace", "") name = p.get("metadata", {}).get("name", "") phase = p.get("status", {}).get("phase", "") reason = p.get("status", {}).get("reason", "") or "" message = p.get("status", {}).get("message", "") or "" crash = False for cs in p.get("status", {}).get("containerStatuses", []) or []: waiting = (cs.get("state") or {}).get("waiting") or {} if waiting.get("reason") in ("CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"): crash = True reason = waiting.get("reason", reason) message = waiting.get("message", message) if phase in ("Pending", "Failed", "Unknown") or crash: pod_issues += 1 records.append( { "kind": "pod", "namespace": ns, "pod": name, "phase": phase, "reason": reason, "message": message[:300], "checked_at": now, } ) endpoints = [ ("frontend-svc", "http://nxtgauge-frontend-solid.nxtgauge.svc.cluster.local/"), ("admin-svc", "http://nxtgauge-admin-solid.nxtgauge.svc.cluster.local/"), ("api-gateway-svc", "http://nxtgauge-rust-gateway.nxtgauge.svc.cluster.local:9100/health"), ("registry-svc", "http://docker-registry.registry.svc.cluster.local:5000/v2/"), ("woodpecker-svc", "http://woodpecker-server.woodpecker.svc.cluster.local/"), ("argocd-metrics", "http://argocd-server-metrics.argocd.svc.cluster.local:8083/metrics"), ("openobserve-svc", "http://o2-openobserve-standalone.openobserve.svc.cluster.local:5080/healthz"), ] for name, url in endpoints: rec = check_url(name, url) rec["checked_at"] = now records.append(rec) records.append( { "kind": "cluster_summary", "cluster": "nxtgauge", "node_total": len(nodes), "node_ready": ready_count, "node_not_ready": len(nodes) - ready_count, "pod_issues": pod_issues, "checked_at": now, } ) oo_endpoint = os.environ["OO_ENDPOINT"].rstrip("/") oo_org = os.environ.get("OO_ORG", "default") stream = "nxtgauge_k8s_health" auth_header = os.environ["OO_AUTH_HEADER"] payload = json.dumps(records).encode("utf-8") req = urllib.request.Request( f"{oo_endpoint}/api/{oo_org}/{stream}/_json", data=payload, headers={ "Content-Type": "application/json", "Authorization": auth_header, }, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: _ = resp.read()