File update.py of Package mullvadvpn-slowroll

#!/usr/bin/env python
import os
import re
import git
import sys
import shutil
import tarfile
import tempfile
import urllib
import requests
import subprocess
import argparse

repo_name = "mullvad/mullvadvpn-app"
LIST_RELEASE_URL = f"https://api.github.com/repos/{repo_name}/releases"
current_dir = os.path.dirname(os.path.realpath(__file__))

def get_current_release():
    files = list(filter(lambda x: x.endswith(".rpm"), os.listdir(current_dir)))
    assert len(files) == 1, f"Found more than 1 rpm file: {files}"
    reg = ".*-(.*)_x86_64.rpm"
    ver = re.match(reg, files[0]).groups()[0]
    return ver

def get_latest_release(repo, req_tag, arch="x86_64"):
    def get_tarball_url(rel):
        return rel["tarball_url"]
    def get_rpm_url(rel):
        rpm_asset = list(filter(lambda x: x["browser_download_url"].endswith(".rpm") \
                           and arch in x["browser_download_url"], rel["assets"]))
        assert len(rpm_asset) == 1, f"Unexpected rpm asset: {rpm_asset}"
        return rpm_asset[0]["browser_download_url"]
    def get_rpm_sig_url(rel):
        rpm_sig_asset = list(filter(lambda x: x["browser_download_url"].endswith(".rpm.asc") \
                           and arch in x["browser_download_url"], rel["assets"]))
        assert len(rpm_sig_asset) == 1, f"Unexpected rpm sig asset: {rpm_asset}"
        return rpm_sig_asset[0]["browser_download_url"]
    response = requests.get(LIST_RELEASE_URL)
    obj = response.json()
    tag_names = list(filter(lambda x: re.match("[0-9]{4}\..*", x), map(lambda x: x["tag_name"], obj)))
    tag_name = req_tag if req_tag else tag_names[0]
    print(f"Updating to tag {tag_name}")
    requested_rel = list(filter(lambda x: x["tag_name"] == tag_name, obj))[0]
    tarball_url = get_tarball_url(requested_rel)
    rpm_url = get_rpm_url(requested_rel)
    rpm_sig_url = get_rpm_sig_url(requested_rel)
    return tag_name, tarball_url, rpm_url, rpm_sig_url

def get_url_file(url, filename) -> str:
    target_path = os.path.join(current_dir, filename)
    urllib.request.urlretrieve(url, target_path)
    return target_path

def verify_rpm(rpm, rpm_asc):
    subprocess.check_call(["gpg", "--verify", rpm_asc, rpm],
                          stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def compress_tar(dpath, basedir, fname, fbasedir) -> str:
    target_tar_path = os.path.join(fbasedir, fname)
    subprocess.check_call(["tar", "czf", target_tar_path, dpath],
                          stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                          cwd=basedir)
    return target_tar_path
    
def rename_directory(oldname, newname):
    os.rename(oldname, newname)

def delete_directory(dirpath):
    shutil.rmtree(dirpath)

def update_spec_version(new_tag):
    def get_short_ver(full_ver):
        return full_ver.replace("-beta", "b")
    def replace_full_version(line):
        if not line.startswith("%define ver"):
            return line
        else:
            return f"%define ver     {new_tag}"
    def replace_version(line):
        if not line.startswith("Version:  "):
            return line
        else:
            return f"Version:        {get_short_ver(new_tag)}" + "\n"
    with open("mullvadvpn.spec", "r") as spec_file:
        spec_lines = spec_file.readlines()
    new_spec_lines = list(map(lambda x: replace_version(x), spec_lines))
    with open("mullvadvpn.spec", "w") as spec_file:
        spec_file.write("".join(new_spec_lines))

def move_file(filepath, newdir, newname):
    os.rename(filepath, os.path.join(newdir, newname))

def vendor_go(basedir):
    go_dir = os.path.join(basedir, "wireguard-go-rs", "libwg")
    subprocess.check_call(["go", "mod", "vendor", "-e"],
                          stdout=subprocess.DEVNULL,
                          stderr=subprocess.DEVNULL,
                          cwd=go_dir)
    compress_tar(dpath="vendor",
                 basedir=go_dir,
                 fname="wireguard-vendor.tar.gz",
                 fbasedir=os.environ["PWD"])

def vendor_rust(basedir):
    output = subprocess.check_output(["cargo", "vendor"],
                                     stderr=subprocess.DEVNULL,
                                     cwd=basedir).decode()
    with open(os.path.join(os.environ["PWD"], "cargo-config"), "w") as fp:
        fp.write(output)
    compress_tar(dpath="vendor",
                 basedir=basedir,
                 fname="mullvadvpn-app-vendor.tar.gz",
                 fbasedir=os.environ["PWD"])

def gen_api_ip_address(srcdir, fname):
    with open(fname, "w") as fp:
        subprocess.check_call(["cargo", "run", "--bin", "address_cache", "--release"],
                              stdout=fp, cwd=srcdir, stderr=subprocess.DEVNULL)

def gen_relay(srcdir, fname):
    with open(fname, "w") as fp:
        subprocess.check_call(["cargo", "run", "--bin", "relay_list", "--release"],
                              stdout=fp, cwd=srcdir, stderr=subprocess.DEVNULL)

def process_tarball(tmppath, tar_fname, tag_name):
    decompress_tar_path = "mullvad-vpn-tarfile"
    newtar_dirname = f"mullvadvpn-app-{tag_name}"
    decompress_tar_parentdir = os.path.join(tmppath, decompress_tar_path)
    t = tarfile.open(os.path.join(tmppath, tar_fname))
    t.extractall(path=decompress_tar_parentdir)
    t.close()
    l = os.listdir(decompress_tar_parentdir)
    assert len(l) == 1, f"The tarfile contains more than 1 member {l}. Exit."
    decompress_tar_fullpath = os.path.join(decompress_tar_parentdir, l[0])

    # by default, the directory naming is like `mullvad-mullvadvpn-app-aa6bac3`
    # we need to rename it to `newtar_fullpath`
    newtar_fullpath = os.path.join(decompress_tar_parentdir, newtar_dirname)
    rename_directory(decompress_tar_fullpath, newtar_fullpath)

    # re-compress and copy the tar file
    tname = f"{newtar_dirname}.tar.gz"
    compress_tar(dpath=newtar_dirname,
                 basedir=decompress_tar_parentdir,
                 fname=f"{newtar_dirname}.tar.gz",
                 fbasedir=os.environ["PWD"])

    # generate relay json
    print("Generating relays.json...", end="", flush=True)
    gen_relay(newtar_fullpath, "relays.json")
    print("OK")

    # vendor go
    print("Vendoring wireguard-go deps...", end="", flush=True)
    vendor_go(newtar_fullpath)
    print("OK")

    # vendor rust
    print("Vendoring rust deps...", end="", flush=True)
    vendor_rust(newtar_fullpath)
    print("OK")
    return

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--tag", default=None, help="Specify the tag name")
    args = parser.parse_args()

    tempdir = tempfile.mkdtemp(prefix="mullvad-packager")

    # get the tag name
    tag_name, tarball_url, rpm_url, rpm_sig_url = get_latest_release(repo_name, args.tag)
    print(f"work dir: {tempdir}, getting tag name: {tag_name}, tarball url: {tarball_url},"
          f" rpm url: {rpm_url}, rpm sig url: {rpm_sig_url}")

    # download source tarball
    tar_fname = f"mullvadvpn-app-{tag_name}.tar.gz"
    print(f"downloading tarball {tar_fname} ...", end="", flush=True)
    get_url_file(tarball_url, os.path.join(tempdir, tar_fname))
    print("OK")

    # download rpm file
    rpm_fname = f"MullvadVPN-{tag_name}_x86_64.rpm"
    print(f"downloading rpm {rpm_fname} ...", end="", flush=True)
    rpm_file = get_url_file(rpm_url, os.path.join(os.environ["PWD"], rpm_fname))
    print("OK")

    # download rpm sig file
    rpm_asc_fname = f"{rpm_fname}.asc"
    print(f"downloading rpm sig {rpm_asc_fname} ...", end="", flush=True)
    rpm_asc_file = get_url_file(rpm_sig_url, os.path.join(os.environ["PWD"], rpm_asc_fname))
    print("OK")

    # Verify rpm
    verify_rpm(rpm_file, rpm_asc_file)

    process_tarball(tempdir, tar_fname, tag_name)

    print(f"removing tmp path {tempdir} ...", end="", flush=True)
    delete_directory(tempdir)
    print("OK")

    print(f"updating spec file: {tag_name}")
    update_spec_version(tag_name)
    print("done")
openSUSE Build Service is sponsored by