File ip-blocklist-update.py of Package ip-blocklist

#!/usr/bin/env python

import os
import re
import requests
import subprocess
import tomllib


DEFAULT_CONF_DIR  = "/etc/ip-blocklist"
DEFAULT_STATE_DIR = "/var/lib/ip-blocklist"

BLOCKLISTS_CONF = os.environ.get("CONFIGURATION_DIRECTORY", DEFAULT_CONF_DIR) + "/blocklists.toml"
BLOCKLIST4      = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/blocklist4.conf"
BLOCKLIST6      = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/blocklist6.conf"
ALLOWLIST4      = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/allowlist4.conf"
ALLOWLIST6      = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/allowlist6.conf"
NFTABLES_CONF   = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/nftables.conf"

comment = re.compile(r"^((\s|\t)*#|(\s|\t)*$)")


def populate_blocklist(name, sources, target):
    i = 0

    with open(target, "w") as target_file:
        target_file.write("elements = {\n")

        if len(sources) > 0:
            for source in sources:
                print(f"Processing {source}...")
                content = requests.get(source, timeout=10)
                if content.status_code == 200:
                    addresses = content.content.decode("utf-8").splitlines()
                    j = 0
                    for address in addresses:
                        if comment.match(address):
                            continue
                        target_file.write(f"\t{address},\n")
                        j += 1
                    print(f"Addresses added: {j}")
                    i += j
                else:
                    print("Source is unavailable")
        else:
            print(f"{name}: no sources available")

        target_file.write("}\n")

    return i


def populate_allowlist(name, source, target):
    i = 0

    with open(target, "w") as target_file:
        target_file.write("elements = {\n")

        if len(source) > 0:
            print(f"Processing {name}...")
            for address in source:
                target_file.write(f"\t{address},\n")
                i += 1
            print(f"Addresses added: {i}")
        else:
            print(f"{name}: source is empty")

        target_file.write("}\n")

    return i


def define_set(conf_file, list_name, list_type, list_file):
    conf_file.write(f"""
    set {list_name} {{
        type {list_type}
        flags interval
        auto-merge
        include "{list_file}"
    }}""")

with open(BLOCKLISTS_CONF, "rb") as blocklists_conf_file:
    blocklists_conf = tomllib.load(blocklists_conf_file)

    blocklist4_count = populate_blocklist("blocklist4", blocklists_conf["sources4"], BLOCKLIST4)
    blocklist6_count = populate_blocklist("blocklist6", blocklists_conf["sources6"], BLOCKLIST6)
    allowlist4_count = populate_allowlist("allow4", blocklists_conf["allow4"], ALLOWLIST4)
    allowlist6_count = populate_allowlist("allow6", blocklists_conf["allow6"], ALLOWLIST6)

ext_ifs_s = ", ".join(blocklists_conf["ext_ifs"])
int_ifs_s = ", ".join(blocklists_conf["int_ifs"])

with open(NFTABLES_CONF, "w") as nftables_conf_file:
    nftables_conf_file.write(f"""destroy table inet ip-blocklist
table inet ip-blocklist {{""")

    if allowlist4_count > 0:
        define_set(nftables_conf_file, "allowlist4", "ipv4_addr", ALLOWLIST4)

    if allowlist6_count > 0:
        define_set(nftables_conf_file, "allowlist6", "ipv6_addr", ALLOWLIST6)

    if blocklist4_count > 0:
        define_set(nftables_conf_file, "blocklist4", "ipv4_addr", BLOCKLIST4)

    if blocklist6_count > 0:
        define_set(nftables_conf_file, "blocklist6", "ipv6_addr", BLOCKLIST6)

    if len(int_ifs_s) > 0:
        nftables_conf_file.write(f"""
    chain prerouting {{
        type filter hook prerouting priority dstnat""")

        if allowlist4_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip saddr @allowlist4 accept
        iifname {{ {int_ifs_s} }} ip daddr @allowlist4 accept""")

        if allowlist6_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip6 saddr @allowlist6 accept
        iifname {{ {int_ifs_s} }} ip6 daddr @allowlist6 accept""")

        if blocklist4_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip saddr @blocklist4 drop
        iifname {{ {int_ifs_s} }} ip daddr @blocklist4 reject with icmp type admin-prohibited""")

        if blocklist6_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip6 saddr @blocklist6 drop
        iifname {{ {int_ifs_s} }} ip6 daddr @blocklist6 reject with icmpv6 type admin-prohibited""")

        nftables_conf_file.write(f"""
    }}""")
    else:
        nftables_conf_file.write(f"""
    chain input {{
        type filter hook input priority filter""")

        if allowlist4_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip saddr @allowlist4 accept""")

        if allowlist6_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip6 saddr @allowlist6 accept""")

        if blocklist4_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip saddr @blocklist4 drop""")

        if blocklist6_count > 0:
            nftables_conf_file.write(f"""
        iifname {{ {ext_ifs_s} }} ip6 daddr @blocklist6 drop""")

        nftables_conf_file.write(f"""
    }}""")

    nftables_conf_file.write(f"""
    chain output {{
        type filter hook output priority filter""")

    if allowlist4_count > 0:
        nftables_conf_file.write(f"""
        oifname {{ {ext_ifs_s} }} ip daddr @allowlist4 accept""")

    if allowlist6_count > 0:
        nftables_conf_file.write(f"""
        oifname {{ {ext_ifs_s} }} ip6 daddr @allowlist6 accept""")

    if blocklist4_count > 0:
        nftables_conf_file.write(f"""
        oifname {{ {ext_ifs_s} }} ip daddr @blocklist4 reject with icmp type admin-prohibited""")

    if blocklist6_count > 0:
        nftables_conf_file.write(f"""
        oifname {{ {ext_ifs_s} }} ip6 daddr @blocklist6 reject with icmpv6 type admin-prohibited""")

    nftables_conf_file.write(f"""
    }}
}}""")

print("Reloading nftables...")
subprocess.run(["/usr/bin/nft", "--file", NFTABLES_CONF])

print("Done.")
openSUSE Build Service is sponsored by