File add-ip-filtering-by-network.patch of Package salt

From a8615ab8f3debdc5962ecda5c52a432987bde02a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marcus=20R=C3=BCckert?= <darix@nordisch.org>
Date: Thu, 20 Feb 2020 17:13:31 +0100
Subject: [PATCH] Add IP filtering by network

IPs are filtered out if they don't belong to any of the given networks.
If `None` is passed as the network, all IPs are returned. An empty list
rejects all IPs.

Example:

	{% set networks = ['192.168.0.0/24', 'fe80::/64'] %}
	{{ grains['ip_interfaces'] | filter_by_networks(networks) }}
	{{ grains['ipv6'] | filter_by_networks(networks) }}
	{{ grains['ipv4'] | filter_by_networks(networks) }}

Fixes #212
Co-authored-by: Alexander Graul <agraul@suse.com>

Add unit tests for filter_by_networks
---
 salt/utils/network.py            | 35 +++++++++++++++++++++-
 tests/unit/utils/test_network.py | 50 ++++++++++++++++++++++++++++++++
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/salt/utils/network.py b/salt/utils/network.py
index def997f3dc36654efacff00bf14d00c43c7ff7d9..09fb0ac2346a27e5383fc2904b85011e467e5fb8 100644
--- a/salt/utils/network.py
+++ b/salt/utils/network.py
@@ -5,7 +5,9 @@ Define some generic socket functions for network modules
 '''
 
 # Import python libs
-from __future__ import absolute_import, unicode_literals, print_function
+from __future__ import absolute_import, print_function, unicode_literals
+
+import collections
 import itertools
 import os
 import re
@@ -1987,3 +1989,34 @@ def is_fqdn(hostname):
 
     compliant = re.compile(r"(?!-)[A-Z\d\-\_]{1,63}(?<!-)$", re.IGNORECASE)
     return "." in hostname and len(hostname) < 0xff and all(compliant.match(x) for x in hostname.rstrip(".").split("."))
+
+
+@jinja_filter("filter_by_networks")
+def filter_by_networks(values, networks):
+    """
+    Returns the list of IPs filtered by the network list.
+    If the network list is an empty sequence, no IPs are returned.
+    If the network list is None, all IPs are returned.
+
+    {% set networks = ['192.168.0.0/24', 'fe80::/64'] %}
+    {{ grains['ip_interfaces'] | filter_by_networks(networks) }}
+    {{ grains['ipv6'] | filter_by_networks(networks) }}
+    {{ grains['ipv4'] | filter_by_networks(networks) }}
+    """
+
+    _filter = lambda ips, networks: [
+        ip for ip in ips for net in networks if ipaddress.ip_address(ip) in net
+    ]
+
+    if networks is not None:
+        networks = [ipaddress.ip_network(network) for network in networks]
+        if isinstance(values, collections.Mapping):
+            return {
+                interface: _filter(values[interface], networks) for interface in values
+            }
+        elif isinstance(values, collections.Sequence):
+            return _filter(values, networks)
+        else:
+            raise ValueError("Do not know how to filter a {}".format(type(values)))
+    else:
+        return values
diff --git a/tests/unit/utils/test_network.py b/tests/unit/utils/test_network.py
index 74479b0cae87a33b0ecc9648f568c4d87bb6e445..b734f862f68edbd6cf25658ff1a2d25a0c2f4383 100644
--- a/tests/unit/utils/test_network.py
+++ b/tests/unit/utils/test_network.py
@@ -13,6 +13,8 @@ from tests.support.mock import (
     create_autospec,
     patch,
 )
+import pytest
+import salt.exceptions
 
 # Import salt libs
 import salt.utils.network as network
@@ -725,3 +727,51 @@ class NetworkTestCase(TestCase):
         """
         for fqdn in ["hostname", "/some/path", "$variable.here", "verylonghostname.{}".format("domain" * 45)]:
             assert not network.is_fqdn(fqdn)
+
+    def test_filter_by_networks_with_no_filter(self):
+        ips = ["10.0.123.200", "10.10.10.10"]
+        with pytest.raises(TypeError):
+            network.filter_by_networks(ips)  # pylint: disable=no-value-for-parameter
+
+    def test_filter_by_networks_empty_filter(self):
+        ips = ["10.0.123.200", "10.10.10.10"]
+        assert network.filter_by_networks(ips, []) == []
+
+    def test_filter_by_networks_ips_list(self):
+        ips = [
+            "10.0.123.200",
+            "10.10.10.10",
+            "193.124.233.5",
+            "fe80::d210:cf3f:64e7:5423",
+        ]
+        networks = ["10.0.0.0/8", "fe80::/64"]
+        assert network.filter_by_networks(ips, networks) == [
+            "10.0.123.200",
+            "10.10.10.10",
+            "fe80::d210:cf3f:64e7:5423",
+        ]
+
+    def test_filter_by_networks_interfaces_dict(self):
+        interfaces = {
+            "wlan0": ["192.168.1.100", "217.5.140.67", "2001:db8::ff00:42:8329"],
+            "eth0": [
+                "2001:0DB8:0:CD30:123:4567:89AB:CDEF",
+                "192.168.1.101",
+                "10.0.123.201",
+            ],
+        }
+        assert network.filter_by_networks(
+            interfaces, ["192.168.1.0/24", "2001:db8::/48"]
+        ) == {
+            "wlan0": ["192.168.1.100", "2001:db8::ff00:42:8329"],
+            "eth0": ["2001:0DB8:0:CD30:123:4567:89AB:CDEF", "192.168.1.101"],
+        }
+
+    def test_filter_by_networks_catch_all(self):
+        ips = [
+            "10.0.123.200",
+            "10.10.10.10",
+            "193.124.233.5",
+            "fe80::d210:cf3f:64e7:5423",
+        ]
+        assert ips == network.filter_by_networks(ips, ["0.0.0.0/0", "::/0"])
-- 
2.23.0


openSUSE Build Service is sponsored by