diff --git a/.gitea/scripts/registry_prune.py b/.gitea/scripts/registry_prune.py new file mode 100644 index 0000000..12c40ec --- /dev/null +++ b/.gitea/scripts/registry_prune.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +""" +Registry Image Tag Pruner - Keeps only the latest 1 SHA-tag per repository. + +Usage: + python3 registry_prune.py \ + --registry registry.nxtgauge.com \ + --repo nxtgauge-rust-gateway \ + --username "$REGISTRY_USERNAME" \ + --password "$REGISTRY_PASSWORD" + +Environment variables can also be used: + REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD + +SHA-like tags are identified by pattern: ^[a-f0-9]{40}$ +Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are NEVER deleted. + +Exit code: 0 on success (or if prune fails gracefully), non-zero only on critical error. +""" + +import argparse +import base64 +import json +import os +import sys +import time +from urllib.request import Request, urlopen +from urllib.error import URLError, HTTPError + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Prune Docker registry tags, keeping only the latest SHA tag." + ) + parser.add_argument("--registry", default=os.environ.get("REGISTRY_HOST")) + parser.add_argument("--repo", default=os.environ.get("REGISTRY_REPO")) + parser.add_argument("--username", default=os.environ.get("REGISTRY_USERNAME")) + parser.add_argument("--password", default=os.environ.get("REGISTRY_PASSWORD")) + parser.add_argument("--keep", type=int, default=1, help="Number of SHA tags to keep (default: 1)") + return parser.parse_args() + + +def api_request(url: str, method: str, username: str, password: str, data=None, retries: int = 3) -> dict | None: + """Make an authenticated API request with retry logic.""" + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + headers = { + "Authorization": f"Basic {auth}", + "Content-Type": "application/json", + } + + for attempt in range(1, retries + 1): + try: + req = Request(url, method=method, headers=headers, data=data) + with urlopen(req, timeout=30) as response: + content = response.read() + if content: + return json.loads(content) + return {} + except HTTPError as e: + if e.code == 401: + print(f" [ERROR] Authentication failed (401)") + return None + if e.code == 404: + print(f" [WARN] Resource not found: {url}") + return None + print(f" [RETRY {attempt}/{retries}] HTTP {e.code} for {url}") + except URLError as e: + print(f" [RETRY {attempt}/{retries}] URL error: {e.reason}") + except Exception as e: + print(f" [RETRY {attempt}/{retries}] Error: {e}") + + if attempt < retries: + time.sleep(attempt * 2) + + print(f" [ERROR] Failed after {retries} attempts for {url}") + return None + + +def get_tag_digest(registry: str, repo: str, tag: str, username: str, password: str) -> tuple[str, str] | None: + """Get the digest (sha256:...) and created time for a tag.""" + url = f"https://{registry}/v2/{repo}/manifests/{tag}" + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + + for attempt in range(1, 4): + try: + req = Request(url, method="GET", headers={ + "Authorization": f"Basic {auth}", + "Accept": "application/vnd.docker.distribution.manifest.v2+json", + }) + with urlopen(req, timeout=30) as response: + digest = response.headers.get("Docker-Content-Digest", "") + created = response.headers.get("Date", "") + return digest, created + except Exception as e: + print(f" [RETRY {attempt}/3] Getting digest for {tag}: {e}") + time.sleep(attempt) + + return None + + +def delete_tag(registry: str, repo: str, digest: str, username: str, password: str) -> bool: + """Delete a tag by its digest.""" + url = f"https://{registry}/v2/{repo}/manifests/{digest}" + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + + for attempt in range(1, 4): + try: + req = Request(url, method="DELETE", headers={ + "Authorization": f"Basic {auth}", + }) + with urlopen(req, timeout=30) as response: + if response.status in (200, 202, 404): + return True + except HTTPError as e: + if e.code == 404: + return True # Already deleted + print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}") + except Exception as e: + print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}") + + time.sleep(attempt) + + return False + + +def is_sha_tag(tag: str) -> bool: + """Check if tag looks like a SHA (40 hex chars).""" + import re + return bool(re.match(r"^[a-f0-9]{40}$", tag)) + + +def prune_tags(registry: str, repo: str, username: str, password: str, keep: int = 1) -> bool: + """ + Main prune logic: + - List all tags for the repo + - Filter SHA-like tags + - Sort by created date (newest first) + - Keep newest `keep` tags + - Delete older SHA tags by digest + - Never delete non-SHA tags + """ + print(f"\n=== Pruning {registry}/{repo} ===") + print(f"Strategy: Keep {keep} newest SHA tag(s), delete older SHA tags") + print(f"Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are preserved\n") + + # Get catalog (list of repos) + catalog_url = f"https://{registry}/v2/_catalog" + catalog = api_request(catalog_url, "GET", username, password) + if catalog is None: + print("[ERROR] Failed to get repository catalog") + return False + + if repo not in catalog.get("repositories", []): + print(f"[INFO] Repository {repo} not found in catalog") + return True + + # Get tags for repo + tags_url = f"https://{registry}/v2/{repo}/tags/list" + tags_data = api_request(tags_url, "GET", username, password) + if tags_data is None: + print(f"[ERROR] Failed to get tags for {repo}") + return False + + all_tags = tags_data.get("tags", []) + if not all_tags: + print("[INFO] No tags found") + return True + + # Separate SHA tags from non-SHA tags + sha_tags = [t for t in all_tags if is_sha_tag(t)] + non_sha_tags = [t for t in all_tags if not is_sha_tag(t)] + + print(f"Total tags: {len(all_tags)}") + print(f" SHA tags (candidates for pruning): {len(sha_tags)}") + print(f" Non-SHA tags (protected): {len(non_sha_tags)}") + if non_sha_tags: + print(f" Protected tags: {', '.join(sorted(non_sha_tags))}") + + if not sha_tags: + print("\n[INFO] No SHA tags to prune") + return True + + # Get digest and created time for each SHA tag + tag_info = [] + for tag in sha_tags: + result = get_tag_digest(registry, repo, tag, username, password) + if result: + digest, created = result + tag_info.append({ + "tag": tag, + "digest": digest, + "created": created, + "timestamp": parse_http_date(created) if created else 0, + }) + time.sleep(0.1) # Be nice to the registry + + if not tag_info: + print("\n[ERROR] Could not get info for any SHA tags") + return False + + # Sort by timestamp (newest first) + tag_info.sort(key=lambda x: x["timestamp"], reverse=True) + + print(f"\nSHA tags sorted by age (newest first):") + for i, info in enumerate(tag_info): + marker = " [KEEP]" if i < keep else " [DELETE]" + print(f" {i+1}. {info['tag']} ({info['created'] or 'unknown date'}){marker}") + + # Delete older SHA tags + deleted_count = 0 + kept_count = 0 + + for i, info in enumerate(tag_info): + if i < keep: + print(f"\n[KEEP] {info['tag']}") + kept_count += 1 + continue + + print(f"\n[DELETE] {info['tag']} (digest: {info['digest'][:20]}...)") + if delete_tag(registry, repo, info["digest"], username, password): + print(f" [OK] Deleted {info['tag']}") + deleted_count += 1 + else: + print(f" [WARN] Failed to delete {info['tag']} (will retry next run)") + + time.sleep(0.2) # Be nice to the registry + + print(f"\n=== Prune Summary ===") + print(f"Tags kept: {kept_count}") + print(f"Tags deleted: {deleted_count}") + print(f"Tags protected (non-SHA): {len(non_sha_tags)}") + + return True + + +def parse_http_date(date_str: str) -> float: + """Parse HTTP Date header to timestamp.""" + from email.utils import parsedate_to_datetime + try: + return parsedate_to_datetime(date_str).timestamp() + except Exception: + return 0 + + +def main(): + args = parse_args() + + # Validate required args + registry = args.registry or os.environ.get("REGISTRY_HOST") + repo = args.repo or os.environ.get("REGISTRY_REPO") + username = args.username or os.environ.get("REGISTRY_USERNAME") + password = args.password or os.environ.get("REGISTRY_PASSWORD") + + if not all([registry, repo, username, password]): + print("[ERROR] Missing required arguments. Need: --registry, --repo, --username, --password") + print("Or set environment variables: REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD") + sys.exit(1) + + print(f"Registry: {registry}") + print(f"Repository: {repo}") + print(f"Username: {username}") + + try: + success = prune_tags(registry, repo, username, password, args.keep) + if success: + print("\n[OK] Prune completed successfully") + sys.exit(0) + else: + print("\n[WARN] Prune completed with some errors") + sys.exit(0) # Exit 0 per requirement - never fail workflow + except Exception as e: + print(f"\n[ERROR] Prune failed with exception: {e}") + sys.exit(0) # Exit 0 per requirement - never fail workflow + + +if __name__ == "__main__": + main() diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index e45b0ce..e020445 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -44,3 +44,19 @@ jobs: -t "$REGISTRY_HOSTPORT/nxtgauge-ai-assistant:${{ gitea.sha }}" \ -t "$REGISTRY_HOSTPORT/nxtgauge-ai-assistant:main-latest" \ . + + - name: Prune old image tags (keep latest 1 SHA) + if: success() + continue-on-error: true + env: + REGISTRY_HOST: ${{ secrets.REGISTRY_HOSTPORT }} + REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }} + REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }} + run: | + set -euo pipefail + python3 .gitea/scripts/registry_prune.py \ + --registry "$REGISTRY_HOST" \ + --repo "nxtgauge-ai-assistant" \ + --username "$REGISTRY_USERNAME" \ + --password "$REGISTRY_PASSWORD" \ + --keep 1