ci: add post-push registry prune (keep latest 1 SHA build)
This commit is contained in:
parent
42a9a17133
commit
3703d70eb2
2 changed files with 293 additions and 0 deletions
277
.gitea/scripts/registry_prune.py
Normal file
277
.gitea/scripts/registry_prune.py
Normal file
|
|
@ -0,0 +1,277 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Registry Image Tag Pruner - Keeps only the latest 1 SHA-tag per repository.
|
||||
|
||||
Usage:
|
||||
python3 registry_prune.py \
|
||||
--registry registry.nxtgauge.com \
|
||||
--repo nxtgauge-rust-gateway \
|
||||
--username "$REGISTRY_USERNAME" \
|
||||
--password "$REGISTRY_PASSWORD"
|
||||
|
||||
Environment variables can also be used:
|
||||
REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD
|
||||
|
||||
SHA-like tags are identified by pattern: ^[a-f0-9]{40}$
|
||||
Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are NEVER deleted.
|
||||
|
||||
Exit code: 0 on success (or if prune fails gracefully), non-zero only on critical error.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Prune Docker registry tags, keeping only the latest SHA tag."
|
||||
)
|
||||
parser.add_argument("--registry", default=os.environ.get("REGISTRY_HOST"))
|
||||
parser.add_argument("--repo", default=os.environ.get("REGISTRY_REPO"))
|
||||
parser.add_argument("--username", default=os.environ.get("REGISTRY_USERNAME"))
|
||||
parser.add_argument("--password", default=os.environ.get("REGISTRY_PASSWORD"))
|
||||
parser.add_argument("--keep", type=int, default=1, help="Number of SHA tags to keep (default: 1)")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def api_request(url: str, method: str, username: str, password: str, data=None, retries: int = 3) -> dict | None:
|
||||
"""Make an authenticated API request with retry logic."""
|
||||
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
|
||||
headers = {
|
||||
"Authorization": f"Basic {auth}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
for attempt in range(1, retries + 1):
|
||||
try:
|
||||
req = Request(url, method=method, headers=headers, data=data)
|
||||
with urlopen(req, timeout=30) as response:
|
||||
content = response.read()
|
||||
if content:
|
||||
return json.loads(content)
|
||||
return {}
|
||||
except HTTPError as e:
|
||||
if e.code == 401:
|
||||
print(f" [ERROR] Authentication failed (401)")
|
||||
return None
|
||||
if e.code == 404:
|
||||
print(f" [WARN] Resource not found: {url}")
|
||||
return None
|
||||
print(f" [RETRY {attempt}/{retries}] HTTP {e.code} for {url}")
|
||||
except URLError as e:
|
||||
print(f" [RETRY {attempt}/{retries}] URL error: {e.reason}")
|
||||
except Exception as e:
|
||||
print(f" [RETRY {attempt}/{retries}] Error: {e}")
|
||||
|
||||
if attempt < retries:
|
||||
time.sleep(attempt * 2)
|
||||
|
||||
print(f" [ERROR] Failed after {retries} attempts for {url}")
|
||||
return None
|
||||
|
||||
|
||||
def get_tag_digest(registry: str, repo: str, tag: str, username: str, password: str) -> tuple[str, str] | None:
|
||||
"""Get the digest (sha256:...) and created time for a tag."""
|
||||
url = f"https://{registry}/v2/{repo}/manifests/{tag}"
|
||||
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
|
||||
|
||||
for attempt in range(1, 4):
|
||||
try:
|
||||
req = Request(url, method="GET", headers={
|
||||
"Authorization": f"Basic {auth}",
|
||||
"Accept": "application/vnd.docker.distribution.manifest.v2+json",
|
||||
})
|
||||
with urlopen(req, timeout=30) as response:
|
||||
digest = response.headers.get("Docker-Content-Digest", "")
|
||||
created = response.headers.get("Date", "")
|
||||
return digest, created
|
||||
except Exception as e:
|
||||
print(f" [RETRY {attempt}/3] Getting digest for {tag}: {e}")
|
||||
time.sleep(attempt)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def delete_tag(registry: str, repo: str, digest: str, username: str, password: str) -> bool:
|
||||
"""Delete a tag by its digest."""
|
||||
url = f"https://{registry}/v2/{repo}/manifests/{digest}"
|
||||
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
|
||||
|
||||
for attempt in range(1, 4):
|
||||
try:
|
||||
req = Request(url, method="DELETE", headers={
|
||||
"Authorization": f"Basic {auth}",
|
||||
})
|
||||
with urlopen(req, timeout=30) as response:
|
||||
if response.status in (200, 202, 404):
|
||||
return True
|
||||
except HTTPError as e:
|
||||
if e.code == 404:
|
||||
return True # Already deleted
|
||||
print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}")
|
||||
except Exception as e:
|
||||
print(f" [RETRY {attempt}/3] Deleting {digest[:20]}...: {e}")
|
||||
|
||||
time.sleep(attempt)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_sha_tag(tag: str) -> bool:
|
||||
"""Check if tag looks like a SHA (40 hex chars)."""
|
||||
import re
|
||||
return bool(re.match(r"^[a-f0-9]{40}$", tag))
|
||||
|
||||
|
||||
def prune_tags(registry: str, repo: str, username: str, password: str, keep: int = 1) -> bool:
|
||||
"""
|
||||
Main prune logic:
|
||||
- List all tags for the repo
|
||||
- Filter SHA-like tags
|
||||
- Sort by created date (newest first)
|
||||
- Keep newest `keep` tags
|
||||
- Delete older SHA tags by digest
|
||||
- Never delete non-SHA tags
|
||||
"""
|
||||
print(f"\n=== Pruning {registry}/{repo} ===")
|
||||
print(f"Strategy: Keep {keep} newest SHA tag(s), delete older SHA tags")
|
||||
print(f"Non-SHA tags (e.g., high-performance-latest, main-latest, latest) are preserved\n")
|
||||
|
||||
# Get catalog (list of repos)
|
||||
catalog_url = f"https://{registry}/v2/_catalog"
|
||||
catalog = api_request(catalog_url, "GET", username, password)
|
||||
if catalog is None:
|
||||
print("[ERROR] Failed to get repository catalog")
|
||||
return False
|
||||
|
||||
if repo not in catalog.get("repositories", []):
|
||||
print(f"[INFO] Repository {repo} not found in catalog")
|
||||
return True
|
||||
|
||||
# Get tags for repo
|
||||
tags_url = f"https://{registry}/v2/{repo}/tags/list"
|
||||
tags_data = api_request(tags_url, "GET", username, password)
|
||||
if tags_data is None:
|
||||
print(f"[ERROR] Failed to get tags for {repo}")
|
||||
return False
|
||||
|
||||
all_tags = tags_data.get("tags", [])
|
||||
if not all_tags:
|
||||
print("[INFO] No tags found")
|
||||
return True
|
||||
|
||||
# Separate SHA tags from non-SHA tags
|
||||
sha_tags = [t for t in all_tags if is_sha_tag(t)]
|
||||
non_sha_tags = [t for t in all_tags if not is_sha_tag(t)]
|
||||
|
||||
print(f"Total tags: {len(all_tags)}")
|
||||
print(f" SHA tags (candidates for pruning): {len(sha_tags)}")
|
||||
print(f" Non-SHA tags (protected): {len(non_sha_tags)}")
|
||||
if non_sha_tags:
|
||||
print(f" Protected tags: {', '.join(sorted(non_sha_tags))}")
|
||||
|
||||
if not sha_tags:
|
||||
print("\n[INFO] No SHA tags to prune")
|
||||
return True
|
||||
|
||||
# Get digest and created time for each SHA tag
|
||||
tag_info = []
|
||||
for tag in sha_tags:
|
||||
result = get_tag_digest(registry, repo, tag, username, password)
|
||||
if result:
|
||||
digest, created = result
|
||||
tag_info.append({
|
||||
"tag": tag,
|
||||
"digest": digest,
|
||||
"created": created,
|
||||
"timestamp": parse_http_date(created) if created else 0,
|
||||
})
|
||||
time.sleep(0.1) # Be nice to the registry
|
||||
|
||||
if not tag_info:
|
||||
print("\n[ERROR] Could not get info for any SHA tags")
|
||||
return False
|
||||
|
||||
# Sort by timestamp (newest first)
|
||||
tag_info.sort(key=lambda x: x["timestamp"], reverse=True)
|
||||
|
||||
print(f"\nSHA tags sorted by age (newest first):")
|
||||
for i, info in enumerate(tag_info):
|
||||
marker = " [KEEP]" if i < keep else " [DELETE]"
|
||||
print(f" {i+1}. {info['tag']} ({info['created'] or 'unknown date'}){marker}")
|
||||
|
||||
# Delete older SHA tags
|
||||
deleted_count = 0
|
||||
kept_count = 0
|
||||
|
||||
for i, info in enumerate(tag_info):
|
||||
if i < keep:
|
||||
print(f"\n[KEEP] {info['tag']}")
|
||||
kept_count += 1
|
||||
continue
|
||||
|
||||
print(f"\n[DELETE] {info['tag']} (digest: {info['digest'][:20]}...)")
|
||||
if delete_tag(registry, repo, info["digest"], username, password):
|
||||
print(f" [OK] Deleted {info['tag']}")
|
||||
deleted_count += 1
|
||||
else:
|
||||
print(f" [WARN] Failed to delete {info['tag']} (will retry next run)")
|
||||
|
||||
time.sleep(0.2) # Be nice to the registry
|
||||
|
||||
print(f"\n=== Prune Summary ===")
|
||||
print(f"Tags kept: {kept_count}")
|
||||
print(f"Tags deleted: {deleted_count}")
|
||||
print(f"Tags protected (non-SHA): {len(non_sha_tags)}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def parse_http_date(date_str: str) -> float:
|
||||
"""Parse HTTP Date header to timestamp."""
|
||||
from email.utils import parsedate_to_datetime
|
||||
try:
|
||||
return parsedate_to_datetime(date_str).timestamp()
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
# Validate required args
|
||||
registry = args.registry or os.environ.get("REGISTRY_HOST")
|
||||
repo = args.repo or os.environ.get("REGISTRY_REPO")
|
||||
username = args.username or os.environ.get("REGISTRY_USERNAME")
|
||||
password = args.password or os.environ.get("REGISTRY_PASSWORD")
|
||||
|
||||
if not all([registry, repo, username, password]):
|
||||
print("[ERROR] Missing required arguments. Need: --registry, --repo, --username, --password")
|
||||
print("Or set environment variables: REGISTRY_HOST, REGISTRY_REPO, REGISTRY_USERNAME, REGISTRY_PASSWORD")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Registry: {registry}")
|
||||
print(f"Repository: {repo}")
|
||||
print(f"Username: {username}")
|
||||
|
||||
try:
|
||||
success = prune_tags(registry, repo, username, password, args.keep)
|
||||
if success:
|
||||
print("\n[OK] Prune completed successfully")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("\n[WARN] Prune completed with some errors")
|
||||
sys.exit(0) # Exit 0 per requirement - never fail workflow
|
||||
except Exception as e:
|
||||
print(f"\n[ERROR] Prune failed with exception: {e}")
|
||||
sys.exit(0) # Exit 0 per requirement - never fail workflow
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -213,3 +213,19 @@ jobs:
|
|||
|
||||
echo "Falling back to build without cache export for ${{ matrix.service }}"
|
||||
build_without_cache_export
|
||||
|
||||
- name: Prune old image tags (keep latest 1 SHA)
|
||||
if: success()
|
||||
continue-on-error: true
|
||||
env:
|
||||
REGISTRY_HOST: ${{ secrets.REGISTRY_HOSTPORT }}
|
||||
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
|
||||
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
python3 .gitea/scripts/registry_prune.py \
|
||||
--registry "$REGISTRY_HOST" \
|
||||
--repo "nxtgauge-rust-${{ matrix.service }}" \
|
||||
--username "$REGISTRY_USERNAME" \
|
||||
--password "$REGISTRY_PASSWORD" \
|
||||
--keep 1
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue