File ip-blocklist-update.py of Package ip-blocklist
#!/usr/bin/env python
import os
import re
import requests
import subprocess
import tomllib
DEFAULT_CONF_DIR = "/etc/ip-blocklist"
DEFAULT_STATE_DIR = "/var/lib/ip-blocklist"
BLOCKLISTS_CONF = os.environ.get("CONFIGURATION_DIRECTORY", DEFAULT_CONF_DIR) + "/blocklists.toml"
BLOCKLIST4 = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/blocklist4.conf"
BLOCKLIST6 = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/blocklist6.conf"
ALLOWLIST4 = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/allowlist4.conf"
ALLOWLIST6 = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/allowlist6.conf"
NFTABLES_CONF = os.environ.get("STATE_DIRECTORY", DEFAULT_STATE_DIR) + "/nftables.conf"
comment = re.compile(r"^((\s|\t)*#|(\s|\t)*$)")
def populate_blocklist(name, sources, target):
i = 0
with open(target, "w") as target_file:
target_file.write("elements = {\n")
if len(sources) > 0:
for source in sources:
print(f"Processing {source}...")
content = requests.get(source, timeout=10)
if content.status_code == 200:
addresses = content.content.decode("utf-8").splitlines()
j = 0
for address in addresses:
if comment.match(address):
continue
target_file.write(f"\t{address},\n")
j += 1
print(f"Addresses added: {j}")
i += j
else:
print("Source is unavailable")
else:
print(f"{name}: no sources available")
target_file.write("}\n")
return i
def populate_allowlist(name, source, target):
i = 0
with open(target, "w") as target_file:
target_file.write("elements = {\n")
if len(source) > 0:
print(f"Processing {name}...")
for address in source:
target_file.write(f"\t{address},\n")
i += 1
print(f"Addresses added: {i}")
else:
print(f"{name}: source is empty")
target_file.write("}\n")
return i
def define_set(conf_file, list_name, list_type, list_file):
conf_file.write(f"""
set {list_name} {{
type {list_type}
flags interval
auto-merge
include "{list_file}"
}}""")
with open(BLOCKLISTS_CONF, "rb") as blocklists_conf_file:
blocklists_conf = tomllib.load(blocklists_conf_file)
blocklist4_count = populate_blocklist("blocklist4", blocklists_conf["sources4"], BLOCKLIST4)
blocklist6_count = populate_blocklist("blocklist6", blocklists_conf["sources6"], BLOCKLIST6)
allowlist4_count = populate_allowlist("allow4", blocklists_conf["allow4"], ALLOWLIST4)
allowlist6_count = populate_allowlist("allow6", blocklists_conf["allow6"], ALLOWLIST6)
ext_ifs_s = ", ".join(blocklists_conf["ext_ifs"])
int_ifs_s = ", ".join(blocklists_conf["int_ifs"])
with open(NFTABLES_CONF, "w") as nftables_conf_file:
nftables_conf_file.write(f"""destroy table inet ip-blocklist
table inet ip-blocklist {{""")
if allowlist4_count > 0:
define_set(nftables_conf_file, "allowlist4", "ipv4_addr", ALLOWLIST4)
if allowlist6_count > 0:
define_set(nftables_conf_file, "allowlist6", "ipv6_addr", ALLOWLIST6)
if blocklist4_count > 0:
define_set(nftables_conf_file, "blocklist4", "ipv4_addr", BLOCKLIST4)
if blocklist6_count > 0:
define_set(nftables_conf_file, "blocklist6", "ipv6_addr", BLOCKLIST6)
if len(int_ifs_s) > 0:
nftables_conf_file.write(f"""
chain prerouting {{
type filter hook prerouting priority dstnat""")
if allowlist4_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip saddr @allowlist4 accept
iifname {{ {int_ifs_s} }} ip daddr @allowlist4 accept""")
if allowlist6_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip6 saddr @allowlist6 accept
iifname {{ {int_ifs_s} }} ip6 daddr @allowlist6 accept""")
if blocklist4_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip saddr @blocklist4 drop
iifname {{ {int_ifs_s} }} ip daddr @blocklist4 reject with icmp type admin-prohibited""")
if blocklist6_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip6 saddr @blocklist6 drop
iifname {{ {int_ifs_s} }} ip6 daddr @blocklist6 reject with icmpv6 type admin-prohibited""")
nftables_conf_file.write(f"""
}}""")
else:
nftables_conf_file.write(f"""
chain input {{
type filter hook input priority filter""")
if allowlist4_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip saddr @allowlist4 accept""")
if allowlist6_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip6 saddr @allowlist6 accept""")
if blocklist4_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip saddr @blocklist4 drop""")
if blocklist6_count > 0:
nftables_conf_file.write(f"""
iifname {{ {ext_ifs_s} }} ip6 daddr @blocklist6 drop""")
nftables_conf_file.write(f"""
}}""")
nftables_conf_file.write(f"""
chain output {{
type filter hook output priority filter""")
if allowlist4_count > 0:
nftables_conf_file.write(f"""
oifname {{ {ext_ifs_s} }} ip daddr @allowlist4 accept""")
if allowlist6_count > 0:
nftables_conf_file.write(f"""
oifname {{ {ext_ifs_s} }} ip6 daddr @allowlist6 accept""")
if blocklist4_count > 0:
nftables_conf_file.write(f"""
oifname {{ {ext_ifs_s} }} ip daddr @blocklist4 reject with icmp type admin-prohibited""")
if blocklist6_count > 0:
nftables_conf_file.write(f"""
oifname {{ {ext_ifs_s} }} ip6 daddr @blocklist6 reject with icmpv6 type admin-prohibited""")
nftables_conf_file.write(f"""
}}
}}""")
print("Reloading nftables...")
subprocess.run(["/usr/bin/nft", "--file", NFTABLES_CONF])
print("Done.")