File add-ip-filtering-by-network.patch of Package salt
From a8615ab8f3debdc5962ecda5c52a432987bde02a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marcus=20R=C3=BCckert?= <darix@nordisch.org>
Date: Thu, 20 Feb 2020 17:13:31 +0100
Subject: [PATCH] Add IP filtering by network
IPs are filtered out if they don't belong to any of the given networks.
If `None` is passed as the network, all IPs are returned. An empty list
rejects all IPs.
Example:
{% set networks = ['192.168.0.0/24', 'fe80::/64'] %}
{{ grains['ip_interfaces'] | filter_by_networks(networks) }}
{{ grains['ipv6'] | filter_by_networks(networks) }}
{{ grains['ipv4'] | filter_by_networks(networks) }}
Fixes #212
Co-authored-by: Alexander Graul <agraul@suse.com>
Add unit tests for filter_by_networks
---
salt/utils/network.py | 35 +++++++++++++++++++++-
tests/unit/utils/test_network.py | 50 ++++++++++++++++++++++++++++++++
2 files changed, 84 insertions(+), 1 deletion(-)
diff --git a/salt/utils/network.py b/salt/utils/network.py
index def997f3dc36654efacff00bf14d00c43c7ff7d9..09fb0ac2346a27e5383fc2904b85011e467e5fb8 100644
--- a/salt/utils/network.py
+++ b/salt/utils/network.py
@@ -5,7 +5,9 @@ Define some generic socket functions for network modules
'''
# Import python libs
-from __future__ import absolute_import, unicode_literals, print_function
+from __future__ import absolute_import, print_function, unicode_literals
+
+import collections
import itertools
import os
import re
@@ -1987,3 +1989,34 @@ def is_fqdn(hostname):
compliant = re.compile(r"(?!-)[A-Z\d\-\_]{1,63}(?<!-)$", re.IGNORECASE)
return "." in hostname and len(hostname) < 0xff and all(compliant.match(x) for x in hostname.rstrip(".").split("."))
+
+
+@jinja_filter("filter_by_networks")
+def filter_by_networks(values, networks):
+ """
+ Returns the list of IPs filtered by the network list.
+ If the network list is an empty sequence, no IPs are returned.
+ If the network list is None, all IPs are returned.
+
+ {% set networks = ['192.168.0.0/24', 'fe80::/64'] %}
+ {{ grains['ip_interfaces'] | filter_by_networks(networks) }}
+ {{ grains['ipv6'] | filter_by_networks(networks) }}
+ {{ grains['ipv4'] | filter_by_networks(networks) }}
+ """
+
+ _filter = lambda ips, networks: [
+ ip for ip in ips for net in networks if ipaddress.ip_address(ip) in net
+ ]
+
+ if networks is not None:
+ networks = [ipaddress.ip_network(network) for network in networks]
+ if isinstance(values, collections.Mapping):
+ return {
+ interface: _filter(values[interface], networks) for interface in values
+ }
+ elif isinstance(values, collections.Sequence):
+ return _filter(values, networks)
+ else:
+ raise ValueError("Do not know how to filter a {}".format(type(values)))
+ else:
+ return values
diff --git a/tests/unit/utils/test_network.py b/tests/unit/utils/test_network.py
index 74479b0cae87a33b0ecc9648f568c4d87bb6e445..b734f862f68edbd6cf25658ff1a2d25a0c2f4383 100644
--- a/tests/unit/utils/test_network.py
+++ b/tests/unit/utils/test_network.py
@@ -13,6 +13,8 @@ from tests.support.mock import (
create_autospec,
patch,
)
+import pytest
+import salt.exceptions
# Import salt libs
import salt.utils.network as network
@@ -725,3 +727,51 @@ class NetworkTestCase(TestCase):
"""
for fqdn in ["hostname", "/some/path", "$variable.here", "verylonghostname.{}".format("domain" * 45)]:
assert not network.is_fqdn(fqdn)
+
+ def test_filter_by_networks_with_no_filter(self):
+ ips = ["10.0.123.200", "10.10.10.10"]
+ with pytest.raises(TypeError):
+ network.filter_by_networks(ips) # pylint: disable=no-value-for-parameter
+
+ def test_filter_by_networks_empty_filter(self):
+ ips = ["10.0.123.200", "10.10.10.10"]
+ assert network.filter_by_networks(ips, []) == []
+
+ def test_filter_by_networks_ips_list(self):
+ ips = [
+ "10.0.123.200",
+ "10.10.10.10",
+ "193.124.233.5",
+ "fe80::d210:cf3f:64e7:5423",
+ ]
+ networks = ["10.0.0.0/8", "fe80::/64"]
+ assert network.filter_by_networks(ips, networks) == [
+ "10.0.123.200",
+ "10.10.10.10",
+ "fe80::d210:cf3f:64e7:5423",
+ ]
+
+ def test_filter_by_networks_interfaces_dict(self):
+ interfaces = {
+ "wlan0": ["192.168.1.100", "217.5.140.67", "2001:db8::ff00:42:8329"],
+ "eth0": [
+ "2001:0DB8:0:CD30:123:4567:89AB:CDEF",
+ "192.168.1.101",
+ "10.0.123.201",
+ ],
+ }
+ assert network.filter_by_networks(
+ interfaces, ["192.168.1.0/24", "2001:db8::/48"]
+ ) == {
+ "wlan0": ["192.168.1.100", "2001:db8::ff00:42:8329"],
+ "eth0": ["2001:0DB8:0:CD30:123:4567:89AB:CDEF", "192.168.1.101"],
+ }
+
+ def test_filter_by_networks_catch_all(self):
+ ips = [
+ "10.0.123.200",
+ "10.10.10.10",
+ "193.124.233.5",
+ "fe80::d210:cf3f:64e7:5423",
+ ]
+ assert ips == network.filter_by_networks(ips, ["0.0.0.0/0", "::/0"])
--
2.23.0