File 20-helm-images of Package edge-build-checks

#!/usr/bin/python3
import os
import glob
import subprocess
import yaml
import sys
import pprint

AUTHORIZED_REPOS = [
    "registry.suse.com/suse/sles/",
    "registry.suse.com/rancher",
    "registry.rancher.com",
]

EXTRA_CONFIG = None

class CheckResult:
    """Class to track count of issues"""

    def __init__(self):
        self.hints = 0
        self.warnings = 0
        self.errors = 0

    def hint(self, msg):
        print(f"Hint: {msg}")
        self.hints += 1

    def warn(self, msg):
        print(f"Warning: {msg}")
        self.warnings += 1

    def error(self, msg):
        print(f"Error: {msg}")
        self.errors += 1


def tarballs():
    """Return a list of .helminfo files to check."""
    if "BUILD_ROOT" not in os.environ:
        # Not running in an OBS build container
        return glob.glob("*.tgz")

    # Running in an OBS build container
    buildroot = os.environ["BUILD_ROOT"]
    topdir = "/usr/src/packages"
    if os.path.isdir(buildroot + "/.build.packages"):
        topdir = "/.build.packages"
    if os.path.islink(buildroot + "/.build.packages"):
        topdir = "/" + os.readlink(buildroot + "/.build.packages")

    return glob.glob(f"{buildroot}{topdir}/HELM/*.tgz")

def get_extra_config():
    global EXTRA_CONFIG
    if EXTRA_CONFIG is not None:
        return EXTRA_CONFIG
    
    if "BUILD_ROOT" not in os.environ:
        file_path = "./.checks_helm.yaml"
    else:
        buildroot = os.environ["BUILD_ROOT"]
        topdir = "/usr/src/packages"
        file_path = f"{buildroot}{topdir}/SOURCES/.checks_helm.yaml"
    try:
        with open(file_path) as config_file:
            EXTRA_CONFIG = yaml.safe_load(config_file)
            if EXTRA_CONFIG is None:  # No document in stream
                EXTRA_CONFIG = {}
    except OSError:
        EXTRA_CONFIG = {}
    return EXTRA_CONFIG

def get_extra_params():
    config = get_extra_config()
    args = []
    for api in config.get('extra_apis', []):
        args.extend(['-a', api])
    return args

def is_exception(image):
    config = get_extra_config()
    exceptions = config.get('image_exceptions', [])
    (namespace, _, _) = image.partition(':')
    return namespace in exceptions

def get_template(tarball_path):
    raw_templates = subprocess.check_output(
        [
            "helm",
            "template",
            tarball_path,
        ] + get_extra_params()
    ).decode()
    return yaml.safe_load_all(raw_templates)


def extract_key(key, var):
    if hasattr(var, "items"):  # hasattr(var,'items') for python 3
        for k, v in var.items():  # var.items() for python 3
            if k == key:
                yield v
            if isinstance(v, dict):
                for result in extract_key(key, v):
                    yield result
            elif isinstance(v, list):
                for d in v:
                    for result in extract_key(key, d):
                        yield result


def check_template(result, template):
    if template["kind"] not in [
        "Pod",
        "Deployment",
        "StatefulSet",
        "DaemonSet",
        "ReplicaSet",
        "Job",
        "CronJob",
    ]:
        return
    for image in extract_key("image", template):
        if not image.startswith(tuple(AUTHORIZED_REPOS)) and not is_exception(image):
            result.error(f"{image} is not from authorized source")
    pass


def main():
    result = CheckResult()
    img_repo = subprocess.check_output(
        [
            "rpm",
            "--macros=/root/.rpmmacros",
            "-E",
            "%{?img_repo}",
        ]
    ).strip()
    if img_repo:
        result.hint(f"Adding '{img_repo.decode()}' to authorized repo")
        AUTHORIZED_REPOS.append(img_repo.decode())
    else:
        result.warn("img_repo macro not defined, will not add extra authorized repo")
    for tarball in tarballs():
        print(f"Looking at {tarball}")
        for template in get_template(tarball):
            if template:  # Exclude empty templates
                check_template(result, template)

    ret = 0
    if result.errors > 0:
        print("Fatal errors found.")
        ret = 1

    sys.exit(ret)


if __name__ == "__main__":
    main()
openSUSE Build Service is sponsored by