ci: add post-push registry prune (keep latest 1 SHA build)

This commit is contained in:
Tracewebstudio Dev 2026-05-01 10:10:26 +02:00
parent 0d63bb304e
commit 8ee9bd1a53
2 changed files with 293 additions and 0 deletions

View file

@ -0,0 +1,277 @@
#!/usr/bin/env python3
"""
Registry Image Tag Pruner - Keeps only the latest 1 SHA-tag per repository.
Usage:
python3 registry_prune.py \
--registry registry.nxtgauge.com \
--repo nxtgauge-rust-gateway \
--username "$REGISTRY_USERNAME" \
--password "$REGISTRY_PASSWORD"
Environment variables can also be used:
REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD
SHA-like tags are identified by pattern: ^[a-f0-9]{40}$
Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are NEVER deleted.
Exit code: 0 on success (or if prune fails gracefully), non-zero only on critical error.
"""
import argparse
import base64
import json
import os
import sys
import time
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
def parse_args():
parser = argparse.ArgumentParser(
description="Prune Docker registry tags, keeping only the latest SHA tag."
)
parser.add_argument("--registry", default=os.environ.get("REGISTRY_HOST"))
parser.add_argument("--repo", default=os.environ.get("REGISTRY_REPO"))
parser.add_argument("--username", default=os.environ.get("REGISTRY_USERNAME"))
parser.add_argument("--password", default=os.environ.get("REGISTRY_PASSWORD"))
parser.add_argument("--keep", type=int, default=1, help="Number of SHA tags to keep (default: 1)")
return parser.parse_args()
def api_request(url: str, method: str, username: str, password: str, data=None, retries: int = 3) -> dict | None:
"""Make an authenticated API request with retry logic."""
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {
"Authorization": f"Basic {auth}",
"Content-Type": "application/json",
}
for attempt in range(1, retries + 1):
try:
req = Request(url, method=method, headers=headers, data=data)
with urlopen(req, timeout=30) as response:
content = response.read()
if content:
return json.loads(content)
return {}
except HTTPError as e:
if e.code == 401:
print(f" [ERROR] Authentication failed (401)")
return None
if e.code == 404:
print(f" [WARN] Resource not found: {url}")
return None
print(f" [RETRY {attempt}/{retries}] HTTP {e.code} for {url}")
except URLError as e:
print(f" [RETRY {attempt}/{retries}] URL error: {e.reason}")
except Exception as e:
print(f" [RETRY {attempt}/{retries}] Error: {e}")
if attempt < retries:
time.sleep(attempt * 2)
print(f" [ERROR] Failed after {retries} attempts for {url}")
return None
def get_tag_digest(registry: str, repo: str, tag: str, username: str, password: str) -> tuple[str, str] | None:
"""Get the digest (sha256:...) and created time for a tag."""
url = f"https://{registry}/v2/{repo}/manifests/{tag}"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
for attempt in range(1, 4):
try:
req = Request(url, method="GET", headers={
"Authorization": f"Basic {auth}",
"Accept": "application/vnd.docker.distribution.manifest.v2+json",
})
with urlopen(req, timeout=30) as response:
digest = response.headers.get("Docker-Content-Digest", "")
created = response.headers.get("Date", "")
return digest, created
except Exception as e:
print(f" [RETRY {attempt}/3] Getting digest for {tag}: {e}")
time.sleep(attempt)
return None
def delete_tag(registry: str, repo: str, digest: str, username: str, password: str) -> bool:
"""Delete a tag by its digest."""
url = f"https://{registry}/v2/{repo}/manifests/{digest}"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
for attempt in range(1, 4):
try:
req = Request(url, method="DELETE", headers={
"Authorization": f"Basic {auth}",
})
with urlopen(req, timeout=30) as response:
if response.status in (200, 202, 404):
return True
except HTTPError as e:
if e.code == 404:
return True # Already deleted
print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}")
except Exception as e:
print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}")
time.sleep(attempt)
return False
def is_sha_tag(tag: str) -> bool:
"""Check if tag looks like a SHA (40 hex chars)."""
import re
return bool(re.match(r"^[a-f0-9]{40}$", tag))
def prune_tags(registry: str, repo: str, username: str, password: str, keep: int = 1) -> bool:
"""
Main prune logic:
- List all tags for the repo
- Filter SHA-like tags
- Sort by created date (newest first)
- Keep newest `keep` tags
- Delete older SHA tags by digest
- Never delete non-SHA tags
"""
print(f"\n=== Pruning {registry}/{repo} ===")
print(f"Strategy: Keep {keep} newest SHA tag(s), delete older SHA tags")
print(f"Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are preserved\n")
# Get catalog (list of repos)
catalog_url = f"https://{registry}/v2/_catalog"
catalog = api_request(catalog_url, "GET", username, password)
if catalog is None:
print("[ERROR] Failed to get repository catalog")
return False
if repo not in catalog.get("repositories", []):
print(f"[INFO] Repository {repo} not found in catalog")
return True
# Get tags for repo
tags_url = f"https://{registry}/v2/{repo}/tags/list"
tags_data = api_request(tags_url, "GET", username, password)
if tags_data is None:
print(f"[ERROR] Failed to get tags for {repo}")
return False
all_tags = tags_data.get("tags", [])
if not all_tags:
print("[INFO] No tags found")
return True
# Separate SHA tags from non-SHA tags
sha_tags = [t for t in all_tags if is_sha_tag(t)]
non_sha_tags = [t for t in all_tags if not is_sha_tag(t)]
print(f"Total tags: {len(all_tags)}")
print(f" SHA tags (candidates for pruning): {len(sha_tags)}")
print(f" Non-SHA tags (protected): {len(non_sha_tags)}")
if non_sha_tags:
print(f" Protected tags: {', '.join(sorted(non_sha_tags))}")
if not sha_tags:
print("\n[INFO] No SHA tags to prune")
return True
# Get digest and created time for each SHA tag
tag_info = []
for tag in sha_tags:
result = get_tag_digest(registry, repo, tag, username, password)
if result:
digest, created = result
tag_info.append({
"tag": tag,
"digest": digest,
"created": created,
"timestamp": parse_http_date(created) if created else 0,
})
time.sleep(0.1) # Be nice to the registry
if not tag_info:
print("\n[ERROR] Could not get info for any SHA tags")
return False
# Sort by timestamp (newest first)
tag_info.sort(key=lambda x: x["timestamp"], reverse=True)
print(f"\nSHA tags sorted by age (newest first):")
for i, info in enumerate(tag_info):
marker = " [KEEP]" if i < keep else " [DELETE]"
print(f" {i+1}. {info['tag']} ({info['created'] or 'unknown date'}){marker}")
# Delete older SHA tags
deleted_count = 0
kept_count = 0
for i, info in enumerate(tag_info):
if i < keep:
print(f"\n[KEEP] {info['tag']}")
kept_count += 1
continue
print(f"\n[DELETE] {info['tag']} (digest: {info['digest'][:20]}...)")
if delete_tag(registry, repo, info["digest"], username, password):
print(f" [OK] Deleted {info['tag']}")
deleted_count += 1
else:
print(f" [WARN] Failed to delete {info['tag']} (will retry next run)")
time.sleep(0.2) # Be nice to the registry
print(f"\n=== Prune Summary ===")
print(f"Tags kept: {kept_count}")
print(f"Tags deleted: {deleted_count}")
print(f"Tags protected (non-SHA): {len(non_sha_tags)}")
return True
def parse_http_date(date_str: str) -> float:
"""Parse HTTP Date header to timestamp."""
from email.utils import parsedate_to_datetime
try:
return parsedate_to_datetime(date_str).timestamp()
except Exception:
return 0
def main():
args = parse_args()
# Validate required args
registry = args.registry or os.environ.get("REGISTRY_HOST")
repo = args.repo or os.environ.get("REGISTRY_REPO")
username = args.username or os.environ.get("REGISTRY_USERNAME")
password = args.password or os.environ.get("REGISTRY_PASSWORD")
if not all([registry, repo, username, password]):
print("[ERROR] Missing required arguments. Need: --registry, --repo, --username, --password")
print("Or set environment variables: REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD")
sys.exit(1)
print(f"Registry: {registry}")
print(f"Repository: {repo}")
print(f"Username: {username}")
try:
success = prune_tags(registry, repo, username, password, args.keep)
if success:
print("\n[OK] Prune completed successfully")
sys.exit(0)
else:
print("\n[WARN] Prune completed with some errors")
sys.exit(0) # Exit 0 per requirement - never fail workflow
except Exception as e:
print(f"\n[ERROR] Prune failed with exception: {e}")
sys.exit(0) # Exit 0 per requirement - never fail workflow
if __name__ == "__main__":
main()

View file

@ -62,3 +62,19 @@ jobs:
echo "Build failed after retries"
exit 1
- name: Prune old image tags (keep latest 1 SHA)
if: success()
continue-on-error: true
env:
REGISTRY_HOST: ${{ secrets.REGISTRY_HOSTPORT }}
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
run: |
set -euo pipefail
python3 .gitea/scripts/registry_prune.py \
--registry "$REGISTRY_HOST" \
--repo "nxtgauge-frontend-solid" \
--username "$REGISTRY_USERNAME" \
--password "$REGISTRY_PASSWORD" \
--keep 1