File 0001-Disallow-router-interface-out-of-subnet-IP-range.patch of Package openstack-neutron

From 1e38aafa84e0908d461bf3fc85d414306f2ea858 Mon Sep 17 00:00:00 2001
From: Miguel Lavalle <miguel.lavalle@huawei.com>
Date: Thu, 14 Jun 2018 09:21:09 -0500
Subject: [PATCH] Disallow router interface out of subnet IP range

Currently, a non privileged tenant can add a router interface to a
shared / external network's subnet with an IP address outside the
subnet's allocation pool, creating a security risk. This patch prevents
tenants who are not the subnet's owner or admin from assigning a router
interface an IP address outside the subnet's allocation pool.

Upstream-ref: https://review.opendev.org/#/c/584326/
Change-Id: I32e76a83443dd8e7d79b396499747f29b4762e92
Closes-Bug: #1757482
(cherry picked from commit c1d2f13495b2eb925be6495840795ead5929fd0e)
---
 neutron/db/l3_db.py                      | 26 +++++++++++++++
 neutron/tests/unit/extensions/test_l3.py | 55 ++++++++++++++++++++++++++++++++
 2 files changed, 81 insertions(+)

diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py
index 9d2b48c45a..7a7c01e55f 100644
--- a/neutron/db/l3_db.py
+++ b/neutron/db/l3_db.py
@@ -697,6 +697,27 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
                                              port['network_id'])
         return port
 
+    def _validate_port_in_range_or_admin(self, context, subnets, port):
+        if context.is_admin:
+            return
+        subnets_by_id = {}
+        for s in subnets:
+            addr_set = netaddr.IPSet()
+            for range in s['allocation_pools']:
+                addr_set.add(netaddr.IPRange(netaddr.IPAddress(range['start']),
+                                             netaddr.IPAddress(range['end'])))
+            subnets_by_id[s['id']] = (addr_set, s['project_id'],)
+        for subnet_id, ip in [(fix_ip['subnet_id'], fix_ip['ip_address'],)
+                              for fix_ip in port['fixed_ips']]:
+            if (ip not in subnets_by_id[subnet_id][0] and
+                    context.project_id != subnets_by_id[subnet_id][1]):
+                msg = (_('Cannot add interface to router because specified '
+                         'port %(port)s has an IP address out of the '
+                         'allocation pool of subnet %(subnet)s, which is not '
+                         'owned by the project making the request') %
+                       {'port': port['id'], 'subnet': subnet_id})
+                raise n_exc.BadRequest(resource='router', msg=msg)
+
     def _validate_router_port_info(self, context, router, port_id):
         with db_api.autonested_transaction(context.session):
             # check again within transaction to mitigate race
@@ -732,6 +753,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
                 msg = _("Cannot have multiple "
                         "IPv4 subnets on router port")
                 raise n_exc.BadRequest(resource='router', msg=msg)
+            self._validate_port_in_range_or_admin(context, subnets, port)
             return port, subnets
 
     def _notify_attaching_interface(self, context, router_id, network_id):
@@ -787,6 +809,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
         if not subnet['gateway_ip']:
             msg = _('Subnet for router interface must have a gateway IP')
             raise n_exc.BadRequest(resource='router', msg=msg)
+        if subnet['project_id'] != context.project_id and not context.is_admin:
+            msg = (_('Cannot add interface to router because subnet %s is not '
+                     'owned by project making the request') % subnet_id)
+            raise n_exc.BadRequest(resource='router', msg=msg)
         if (subnet['ip_version'] == 6 and subnet['ipv6_ra_mode'] is None
                 and subnet['ipv6_address_mode'] is not None):
             msg = (_('IPv6 subnet %s configured to receive RAs from an '
diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py
index 7380690cc9..76062c4666 100644
--- a/neutron/tests/unit/extensions/test_l3.py
+++ b/neutron/tests/unit/extensions/test_l3.py
@@ -1198,6 +1198,61 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
                                                   expected_code=err_code,
                                                   tenant_id='bad_tenant')
 
+    def test_router_add_interface_by_subnet_other_tenant_subnet_returns_400(
+            self):
+        router_tenant_id = _uuid()
+        with self.router(tenant_id=router_tenant_id, set_context=True) as r:
+            with self.network(shared=True) as n:
+                with self.subnet(network=n) as s:
+                    err_code = exc.HTTPBadRequest.code
+                    self._router_interface_action('add',
+                                                  r['router']['id'],
+                                                  s['subnet']['id'],
+                                                  None,
+                                                  expected_code=err_code,
+                                                  tenant_id=router_tenant_id)
+
+    def _test_router_add_interface_by_port_allocation_pool(
+            self, out_of_pool=False, router_action_as_admin=False,
+            expected_code=exc.HTTPOk.code):
+        router_tenant_id = _uuid()
+        with self.router(tenant_id=router_tenant_id, set_context=True) as r:
+            with self.network(shared=True) as n:
+                with self.subnet(network=n) as s1, (
+                     self.subnet(network=n, cidr='fd00::/64',
+                                 ip_version=6)) as s2, (
+                     self.subnet(network=n, cidr='fd01::/64',
+                                 ip_version=6)) as s3:
+                    fixed_ips = [{'subnet_id': s1['subnet']['id']},
+                                 {'subnet_id': s2['subnet']['id']},
+                                 {'subnet_id': s3['subnet']['id']}]
+                    if out_of_pool:
+                        fixed_ips[1] = {'subnet_id': s2['subnet']['id'],
+                                        'ip_address':
+                                            s2['subnet']['gateway_ip']}
+                    with self.port(subnet=s1, fixed_ips=fixed_ips,
+                                   tenant_id=router_tenant_id) as p:
+                        kwargs = {'expected_code': expected_code}
+                        if not router_action_as_admin:
+                            kwargs['tenant_id'] = router_tenant_id
+                        self._router_interface_action(
+                            'add', r['router']['id'], None, p['port']['id'],
+                            **kwargs)
+
+    def test_router_add_interface_by_port_other_tenant_address_in_pool(
+            self):
+        self._test_router_add_interface_by_port_allocation_pool()
+
+    def test_router_add_interface_by_port_other_tenant_address_out_of_pool(
+            self):
+        self._test_router_add_interface_by_port_allocation_pool(
+            out_of_pool=True, expected_code=exc.HTTPBadRequest.code)
+
+    def test_router_add_interface_by_port_admin_address_out_of_pool(
+            self):
+        self._test_router_add_interface_by_port_allocation_pool(
+            out_of_pool=True, router_action_as_admin=True)
+
     def test_router_add_interface_subnet_with_port_from_other_tenant(self):
         tenant_id = _uuid()
         other_tenant_id = _uuid()
-- 
2.16.4

openSUSE Build Service is sponsored by