#!/usr/bin/env python3 """ Registry Image Tag Pruner - Keeps only the latest 1 SHA-tag per repository. Usage: python3 registry_prune.py \ --registry registry.nxtgauge.com \ --repo nxtgauge-rust-gateway \ --username "$REGISTRY_USERNAME" \ --password "$REGISTRY_PASSWORD" Environment variables can also be used: REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD SHA-like tags are identified by pattern: ^[a-f0-9]{40}$ Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are NEVER deleted. Exit code: 0 on success (or if prune fails gracefully), non-zero only on critical error. """ import argparse import base64 import json import os import sys import time from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError def parse_args(): parser = argparse.ArgumentParser( description="Prune Docker registry tags, keeping only the latest SHA tag." ) parser.add_argument("--registry", default=os.environ.get("REGISTRY_HOST")) parser.add_argument("--repo", default=os.environ.get("REGISTRY_REPO")) parser.add_argument("--username", default=os.environ.get("REGISTRY_USERNAME")) parser.add_argument("--password", default=os.environ.get("REGISTRY_PASSWORD")) parser.add_argument("--keep", type=int, default=1, help="Number of SHA tags to keep (default: 1)") return parser.parse_args() def api_request(url: str, method: str, username: str, password: str, data=None, retries: int = 3) -> dict | None: """Make an authenticated API request with retry logic.""" auth = base64.b64encode(f"{username}:{password}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/json", } for attempt in range(1, retries + 1): try: req = Request(url, method=method, headers=headers, data=data) with urlopen(req, timeout=30) as response: content = response.read() if content: return json.loads(content) return {} except HTTPError as e: if e.code == 401: print(f" [ERROR] Authentication failed (401)") return None if e.code == 404: print(f" [WARN] Resource not found: {url}") return None print(f" [RETRY {attempt}/{retries}] HTTP {e.code} for {url}") except URLError as e: print(f" [RETRY {attempt}/{retries}] URL error: {e.reason}") except Exception as e: print(f" [RETRY {attempt}/{retries}] Error: {e}") if attempt < retries: time.sleep(attempt * 2) print(f" [ERROR] Failed after {retries} attempts for {url}") return None def get_tag_digest(registry: str, repo: str, tag: str, username: str, password: str) -> tuple[str, str] | None: """Get the digest (sha256:...) and created time for a tag.""" url = f"https://{registry}/v2/{repo}/manifests/{tag}" auth = base64.b64encode(f"{username}:{password}".encode()).decode() for attempt in range(1, 4): try: req = Request(url, method="GET", headers={ "Authorization": f"Basic {auth}", "Accept": "application/vnd.docker.distribution.manifest.v2+json", }) with urlopen(req, timeout=30) as response: digest = response.headers.get("Docker-Content-Digest", "") created = response.headers.get("Date", "") return digest, created except Exception as e: print(f" [RETRY {attempt}/3] Getting digest for {tag}: {e}") time.sleep(attempt) return None def delete_tag(registry: str, repo: str, digest: str, username: str, password: str) -> bool: """Delete a tag by its digest.""" url = f"https://{registry}/v2/{repo}/manifests/{digest}" auth = base64.b64encode(f"{username}:{password}".encode()).decode() for attempt in range(1, 4): try: req = Request(url, method="DELETE", headers={ "Authorization": f"Basic {auth}", }) with urlopen(req, timeout=30) as response: if response.status in (200, 202, 404): return True except HTTPError as e: if e.code == 404: return True # Already deleted print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}") except Exception as e: print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}") time.sleep(attempt) return False def is_sha_tag(tag: str) -> bool: """Check if tag looks like a SHA (40 hex chars).""" import re return bool(re.match(r"^[a-f0-9]{40}$", tag)) def prune_tags(registry: str, repo: str, username: str, password: str, keep: int = 1) -> bool: """ Main prune logic: - List all tags for the repo - Filter SHA-like tags - Sort by created date (newest first) - Keep newest `keep` tags - Delete older SHA tags by digest - Never delete non-SHA tags """ print(f"\n=== Pruning {registry}/{repo} ===") print(f"Strategy: Keep {keep} newest SHA tag(s), delete older SHA tags") print(f"Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are preserved\n") # Get catalog (list of repos) catalog_url = f"https://{registry}/v2/_catalog" catalog = api_request(catalog_url, "GET", username, password) if catalog is None: print("[ERROR] Failed to get repository catalog") return False if repo not in catalog.get("repositories", []): print(f"[INFO] Repository {repo} not found in catalog") return True # Get tags for repo tags_url = f"https://{registry}/v2/{repo}/tags/list" tags_data = api_request(tags_url, "GET", username, password) if tags_data is None: print(f"[ERROR] Failed to get tags for {repo}") return False all_tags = tags_data.get("tags", []) if not all_tags: print("[INFO] No tags found") return True # Separate SHA tags from non-SHA tags sha_tags = [t for t in all_tags if is_sha_tag(t)] non_sha_tags = [t for t in all_tags if not is_sha_tag(t)] print(f"Total tags: {len(all_tags)}") print(f" SHA tags (candidates for pruning): {len(sha_tags)}") print(f" Non-SHA tags (protected): {len(non_sha_tags)}") if non_sha_tags: print(f" Protected tags: {', '.join(sorted(non_sha_tags))}") if not sha_tags: print("\n[INFO] No SHA tags to prune") return True # Get digest and created time for each SHA tag tag_info = [] for tag in sha_tags: result = get_tag_digest(registry, repo, tag, username, password) if result: digest, created = result tag_info.append({ "tag": tag, "digest": digest, "created": created, "timestamp": parse_http_date(created) if created else 0, }) time.sleep(0.1) # Be nice to the registry if not tag_info: print("\n[ERROR] Could not get info for any SHA tags") return False # Sort by timestamp (newest first) tag_info.sort(key=lambda x: x["timestamp"], reverse=True) print(f"\nSHA tags sorted by age (newest first):") for i, info in enumerate(tag_info): marker = " [KEEP]" if i < keep else " [DELETE]" print(f" {i+1}. {info['tag']} ({info['created'] or 'unknown date'}){marker}") # Delete older SHA tags deleted_count = 0 kept_count = 0 for i, info in enumerate(tag_info): if i < keep: print(f"\n[KEEP] {info['tag']}") kept_count += 1 continue print(f"\n[DELETE] {info['tag']} (digest: {info['digest'][:20]}...)") if delete_tag(registry, repo, info["digest"], username, password): print(f" [OK] Deleted {info['tag']}") deleted_count += 1 else: print(f" [WARN] Failed to delete {info['tag']} (will retry next run)") time.sleep(0.2) # Be nice to the registry print(f"\n=== Prune Summary ===") print(f"Tags kept: {kept_count}") print(f"Tags deleted: {deleted_count}") print(f"Tags protected (non-SHA): {len(non_sha_tags)}") return True def parse_http_date(date_str: str) -> float: """Parse HTTP Date header to timestamp.""" from email.utils import parsedate_to_datetime try: return parsedate_to_datetime(date_str).timestamp() except Exception: return 0 def main(): args = parse_args() # Validate required args registry = args.registry or os.environ.get("REGISTRY_HOST") repo = args.repo or os.environ.get("REGISTRY_REPO") username = args.username or os.environ.get("REGISTRY_USERNAME") password = args.password or os.environ.get("REGISTRY_PASSWORD") if not all([registry, repo, username, password]): print("[ERROR] Missing required arguments. Need: --registry, --repo, --username, --password") print("Or set environment variables: REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD") sys.exit(1) print(f"Registry: {registry}") print(f"Repository: {repo}") print(f"Username: {username}") try: success = prune_tags(registry, repo, username, password, args.keep) if success: print("\n[OK] Prune completed successfully") sys.exit(0) else: print("\n[WARN] Prune completed with some errors") sys.exit(0) # Exit 0 per requirement - never fail workflow except Exception as e: print(f"\n[ERROR] Prune failed with exception: {e}") sys.exit(0) # Exit 0 per requirement - never fail workflow if __name__ == "__main__": main()