File 0001-Check-VMDK-subformat-against-an-allowed-list.patch of Package openstack-cinder

From 3fa26eb41e21994a3e7cd0e674a190c802fd2397 Mon Sep 17 00:00:00 2001
From: Brian Rosmaita <rosmaita.fossdev@gmail.com>
Date: Sat, 14 Jan 2023 07:06:27 -0500
Subject: [PATCH] Check VMDK subformat against an allowed list

Also add a more general check to convert_image that the image format
reported by qemu-img matches what the caller says it is.

Change-Id: I3c60ee4c0795aadf03108ed9b5a46ecd116894af
Partial-bug: #1996188
(cherry picked from commit 930fc93e9fda82a4aa4568ae149c3c80af7379d0)
(cherry picked from commit ba37dc2ead69c08d7ede242295ff997086e6121d)
Conflicts:
  cinder/image/image_utils.py
   - changed type annotations to use implicit Optional to be
     consistent with cinder yoga mypy usage
   - removed refs to image_conversion_disable in tests
(cherry picked from commit 2ae5d53526e2b224d81b3259140c59aba97d72c3)
(cherry picked from commit e96415409daa158cc503c1adec1ca1ca4f940082)
Conflicts:
  cinder/image/image_utils.py
   - removed type annotations
   - restored wallaby-era fetch_verify_image() function signature
(cherry picked from commit be11d54ac420e0ebc0da6917d7ffc3af59b40f24)
Conflicts:
  cinder/tests/unit/test_image_utils.py
   - did not include extraneous test from be11d54ac's parent commit
Additions:
  cinder/image/image_utils.py
   - added code to handle oslo.utils<4.1.0,>=3.14.0
  cinder/tests/unit/test_image_utils.py
   - added a test for ^^

(cherry picked from commit 94a8c291bb4856d2611b7326462720eadf61d9b7)
---
 cinder/image/image_utils.py                        | 170 ++++++++-
 cinder/tests/unit/test_image_utils.py              | 383 +++++++++++++++++++--
 ...vmdk-subformat-allow-list-93e6943d9a486d11.yaml |  33 ++
 3 files changed, 549 insertions(+), 37 deletions(-)
 create mode 100644 releasenotes/notes/bug-1996188-vmdk-subformat-allow-list-93e6943d9a486d11.yaml

diff --git a/cinder/image/image_utils.py b/cinder/image/image_utils.py
index ea59ce4..7761512 100644
--- a/cinder/image/image_utils.py
+++ b/cinder/image/image_utils.py
@@ -53,10 +53,21 @@ from cinder.volume import utils as volume_utils
 
 LOG = logging.getLogger(__name__)
 
-image_helper_opts = [cfg.StrOpt('image_conversion_dir',
-                                default='$state_path/conversion',
-                                help='Directory used for temporary storage '
-                                'during image conversion'), ]
+image_helper_opts = [
+    cfg.StrOpt('image_conversion_dir',
+               default='$state_path/conversion',
+               help='Directory used for temporary storage '
+               'during image conversion'),
+    cfg.ListOpt('vmdk_allowed_types',
+                default=['streamOptimized', 'monolithicSparse'],
+                help='A list of strings describing the VMDK createType '
+                'subformats that are allowed.  We recommend that you only '
+                'include single-file-with-sparse-header variants to avoid '
+                'potential host file exposure when processing named extents '
+                'when an image is converted to raw format as it is written '
+                'to a volume.  If this list is empty, no VMDK images are '
+                'allowed.'),
+]
 
 CONF = cfg.CONF
 CONF.register_opts(image_helper_opts)
@@ -93,7 +104,7 @@ def from_qemu_img_disk_format(disk_format):
 
 def qemu_img_info(path, run_as_root=True, force_share=False):
     """Return an object containing the parsed output from qemu-img info."""
-    cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info']
+    cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info', '--output=json']
     if force_share:
         if qemu_img_supports_force_share():
             cmd.append('--force-share')
@@ -107,7 +118,7 @@ def qemu_img_info(path, run_as_root=True, force_share=False):
         cmd = cmd[2:]
     out, _err = utils.execute(*cmd, run_as_root=run_as_root,
                               prlimit=QEMU_IMG_LIMITS)
-    info = imageutils.QemuImgInfo(out)
+    info = imageutils.QemuImgInfo(out, format='json')
 
     # From Cinder's point of view, any 'luks' formatted images
     # should be treated as 'raw'.
@@ -274,7 +285,31 @@ def _convert_image(prefix, source, dest, out_format,
 
 def convert_image(source, dest, out_format, out_subformat=None,
                   src_format=None, run_as_root=True, throttle=None,
-                  cipher_spec=None, passphrase_file=None):
+                  cipher_spec=None, passphrase_file=None,
+                  image_id=None, data=None):
+    """Convert image to other format.
+
+    NOTE: If the qemu-img convert command fails and this function raises an
+    exception, a non-empty dest file may be left in the filesystem.
+    It is the responsibility of the caller to decide what to do with this file.
+
+    :param source: source filename
+    :param dest: destination filename
+    :param out_format: output image format of qemu-img
+    :param out_subformat: output image subformat
+    :param src_format: source image format (use image_utils.fixup_disk_format()
+           to translate from a Glance format to one recognizable by qemu_img)
+    :param run_as_root: run qemu-img as root
+    :param throttle: a cinder.throttling.Throttle object, or None
+    :param cipher_spec: encryption details
+    :param passphrase_file: filename containing luks passphrase
+    :param image_id: the image ID if this is a Glance image, or None
+    :param data: a imageutils.QemuImgInfo object from this image, or None
+    :raises ImageUnacceptable: when the image fails some format checks
+    :raises ProcessExecutionError: when something goes wrong during conversion
+    """
+    check_image_format(source, src_format, image_id, data, run_as_root)
+
     if not throttle:
         throttle = throttling.Throttle.get_default()
     with throttle.subcommand(source, dest) as throttle_cmd:
@@ -449,6 +484,97 @@ def get_qemu_data(image_id, has_meta, disk_format_raw, dest, run_as_root,
     return data
 
 
+def check_vmdk_image(image_id, data):
+    """Check some rules about VMDK images.
+
+    Make sure the VMDK subformat (the "createType" in vmware docs)
+    is one that we allow as determined by the 'vmdk_allowed_types'
+    configuration option.  The default set includes only types that
+    do not reference files outside the VMDK file, which can otherwise
+    be used in exploits to expose host information.
+
+    :param image_id: the image id
+    :param data: an imageutils.QemuImgInfo object
+    :raises ImageUnacceptable: when the VMDK createType is not in the
+                               allowed list
+    """
+    allowed_types = CONF.vmdk_allowed_types
+
+    if not len(allowed_types):
+        msg = _('Image is a VMDK, but no VMDK createType is allowed')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+
+    try:
+        create_type = data.format_specific['data']['create-type']
+    except KeyError:
+        msg = _('Unable to determine VMDK createType')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+    except TypeError:
+        msg = _('Unable to determine VMDK createType as no format-specific '
+                'information is available')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+
+    if create_type not in allowed_types:
+        LOG.warning('Refusing to process VMDK file with createType of %r '
+                    'which is not in allowed set of: %s', create_type,
+                    ','.join(allowed_types))
+        msg = _('Invalid VMDK create-type specified')
+        raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
+
+
+def check_image_format(source,
+                       src_format=None,
+                       image_id=None,
+                       data=None,
+                       run_as_root=True):
+    """Do some image format checks.
+
+    Verifies that the src_format matches what qemu-img thinks the image
+    format is, and does some vmdk subformat checks.  See Bug #1996188.
+
+    - Does not check for a qcow2 backing file.
+    - Will make a call out to qemu_img if data is None.
+
+    :param source: filename of the image to check
+    :param src_format: source image format recognized by qemu_img, or None
+    :param image_id: the image ID if this is a Glance image, or None
+    :param data: a imageutils.QemuImgInfo object from this image, or None
+    :param run_as_root: when 'data' is None, call 'qemu-img info' as root
+    :raises ImageUnacceptable: when the image fails some format checks
+    :raises ProcessExecutionError: if 'qemu-img info' fails
+    """
+    if image_id is None:
+        image_id = 'internal image'
+    if data is None:
+        data = qemu_img_info(source, run_as_root=run_as_root)
+    if data.file_format is None:
+        raise exception.ImageUnacceptable(
+            reason=_("'qemu-img info' parsing failed."),
+            image_id=image_id)
+
+    if src_format is not None:
+        if src_format.lower() == 'ami':
+            # qemu-img doesn't recognize AMI format; see change Icde4c0f936ce.
+            # We also use lower() here (though nowhere else) to be consistent
+            # with that change.
+            pass
+        elif data.file_format != src_format:
+            LOG.debug("Rejecting image %(image_id)s due to format mismatch. "
+                      "src_format: '%(src)s', but qemu-img info reports: "
+                      "'%(qemu)s'",
+                      {'image_id': image_id,
+                       'src': src_format,
+                       'qemu': data.file_format})
+            msg = _("The image format was claimed to be '%(src)s' but the "
+                    "image data appears to be in a different format.")
+            raise exception.ImageUnacceptable(
+                image_id=image_id,
+                reason=(msg % {'src': src_format}))
+
+    if data.file_format == 'vmdk':
+        check_vmdk_image(image_id, data)
+
+
 def fetch_verify_image(context, image_service, image_id, dest,
                        user_id=None, project_id=None, size=None,
                        run_as_root=True):
@@ -465,7 +591,10 @@ def fetch_verify_image(context, image_service, image_id, dest,
         data = get_qemu_data(image_id, has_meta, format_raw,
                              dest, run_as_root)
         # We can only really do verification of the image if we have
-        # qemu data to use
+        # qemu data to use.
+        # NOTE: We won't have data if qemu_img is not installed *and* the
+        # disk_format recorded in Glance is raw (otherwise an ImageUnacceptable
+        # would have been raised already).  So this isn't as bad as it looks.
         if data is not None:
             fmt = data.file_format
             if fmt is None:
@@ -486,6 +615,11 @@ def fetch_verify_image(context, image_service, image_id, dest,
             if size is not None:
                 check_virtual_size(data.virtual_size, size, image_id)
 
+            # a VMDK can have a backing file, but we have to check for
+            # it differently
+            if fmt == 'vmdk':
+                check_vmdk_image(image_id, data)
+
 
 def fetch_to_vhd(context, image_service,
                  image_id, dest, blocksize, volume_subformat=None,
@@ -521,6 +655,10 @@ def fetch_to_volume_format(context, image_service,
             format_raw = True if image_meta['disk_format'] == 'raw' else False
         except TypeError:
             format_raw = False
+
+        # Probe using the empty tmp file to see if qemu-img is available.
+        # If it's not, and the disk_format recorded in Glance is not 'raw',
+        # this will raise ImageUnacceptable
         data = get_qemu_data(image_id, has_meta, format_raw,
                              tmp, run_as_root)
         if data is None:
@@ -575,13 +713,21 @@ def fetch_to_volume_format(context, image_service,
         # check via 'qemu-img info' that what we copied was in fact a raw
         # image and not a different format with a backing file, which may be
         # malicious.
-        LOG.debug("%s was %s, converting to %s ", image_id, fmt, volume_format)
+        # FIXME: revisit the above 2 comments.  We already have an exception
+        # above for RAW format images when qemu-img is not available, and I'm
+        # pretty sure that the backing file exploit only happens when
+        # converting from some format that supports a backing file TO raw ...
+        # a bit-for-bit copy of a qcow2 with backing file will copy the backing
+        # file *reference* but not its *content*.
         disk_format = fixup_disk_format(image_meta['disk_format'])
+        LOG.debug("%s was %s, converting to %s ", image_id, fmt, volume_format)
 
         convert_image(tmp, dest, volume_format,
                       out_subformat=volume_subformat,
                       src_format=disk_format,
-                      run_as_root=run_as_root)
+                      run_as_root=run_as_root,
+                      image_id=image_id,
+                      data=data)
 
 
 def _validate_file_format(image_data, expected_format):
@@ -629,7 +775,9 @@ def upload_volume(context, image_service, image_meta, volume_path,
 
         out_format = fixup_disk_format(image_meta['disk_format'])
         convert_image(volume_path, tmp, out_format,
-                      run_as_root=run_as_root)
+                      run_as_root=run_as_root,
+                      image_id=image_id,
+                      data=data)
 
         data = qemu_img_info(tmp, run_as_root=run_as_root)
         if data.file_format != out_format:
diff --git a/cinder/tests/unit/test_image_utils.py b/cinder/tests/unit/test_image_utils.py
index 3588b00..2ea1a0e 100644
--- a/cinder/tests/unit/test_image_utils.py
+++ b/cinder/tests/unit/test_image_utils.py
@@ -21,6 +21,7 @@ import cryptography
 import ddt
 import mock
 from oslo_concurrency import processutils
+from oslo_utils import imageutils
 from oslo_utils import units
 from six.moves import builtins
 
@@ -43,7 +44,8 @@ class TestQemuImgInfo(test.TestCase):
 
         output = image_utils.qemu_img_info(test_path)
         mock_exec.assert_called_once_with('env', 'LC_ALL=C', 'qemu-img',
-                                          'info', test_path, run_as_root=True,
+                                          'info', '--output=json', test_path,
+                                          run_as_root=True,
                                           prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
 
@@ -60,7 +62,8 @@ class TestQemuImgInfo(test.TestCase):
                                            force_share=False,
                                            run_as_root=False)
         mock_exec.assert_called_once_with('env', 'LC_ALL=C', 'qemu-img',
-                                          'info', test_path, run_as_root=False,
+                                          'info', '--output=json', test_path,
+                                          run_as_root=False,
                                           prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
 
@@ -75,8 +78,8 @@ class TestQemuImgInfo(test.TestCase):
         mock_os.name = 'nt'
 
         output = image_utils.qemu_img_info(test_path)
-        mock_exec.assert_called_once_with('qemu-img', 'info', test_path,
-                                          run_as_root=True,
+        mock_exec.assert_called_once_with('qemu-img', 'info', '--output=json',
+                                          test_path, run_as_root=True,
                                           prlimit=image_utils.QEMU_IMG_LIMITS)
         self.assertEqual(mock_info.return_value, output)
 
@@ -166,7 +169,8 @@ class TestConvertImage(test.TestCase):
         source = mock.sentinel.source
         dest = mock.sentinel.dest
         out_format = mock.sentinel.out_format
-        mock_info.side_effect = ValueError
+        mock_info.return_value.file_format = 'qcow2'
+        mock_info.return_value.virtual_size = 1048576
         throttle = throttling.Throttle(prefix=['cgcmd'])
 
         with mock.patch('cinder.volume.utils.check_for_odirect_support',
@@ -174,7 +178,8 @@ class TestConvertImage(test.TestCase):
             output = image_utils.convert_image(source, dest, out_format,
                                                throttle=throttle)
 
-            mock_info.assert_called_once_with(source, run_as_root=True)
+            my_call = mock.call(source, run_as_root=True)
+            mock_info.assert_has_calls([my_call, my_call])
             self.assertIsNone(output)
             mock_exec.assert_called_once_with('cgcmd', 'qemu-img', 'convert',
                                               '-O', out_format, '-t', 'none',
@@ -230,7 +235,6 @@ class TestConvertImage(test.TestCase):
         dest = mock.sentinel.dest
         out_format = mock.sentinel.out_format
         out_subformat = 'fake_subformat'
-        mock_info.side_effect = ValueError
 
         output = image_utils.convert_image(source, dest, out_format,
                                            out_subformat=out_subformat)
@@ -692,7 +696,9 @@ class TestUploadVolume(test.TestCase):
         mock_convert.assert_called_once_with(volume_path,
                                              temp_file,
                                              output_format,
-                                             run_as_root=True)
+                                             run_as_root=True,
+                                             image_id=image_meta['id'],
+                                             data=data)
         mock_info.assert_called_with(temp_file, run_as_root=True)
         self.assertEqual(2, mock_info.call_count)
         mock_open.assert_called_once_with(temp_file, 'rb')
@@ -732,6 +738,7 @@ class TestUploadVolume(test.TestCase):
         image_service.update.assert_called_once_with(
             ctxt, image_meta['id'], {}, mock_proxy.return_value)
 
+
     @mock.patch('eventlet.tpool.Proxy')
     @mock.patch('cinder.image.image_utils.utils.temporary_chown')
     @mock.patch('cinder.image.image_utils.CONF')
@@ -763,6 +770,7 @@ class TestUploadVolume(test.TestCase):
         image_service.update.assert_called_once_with(
             ctxt, image_meta['id'], {}, mock_proxy.return_value)
 
+
     @mock.patch('cinder.image.image_utils.CONF')
     @mock.patch('six.moves.builtins.open')
     @mock.patch('cinder.image.image_utils.qemu_img_info')
@@ -788,7 +796,9 @@ class TestUploadVolume(test.TestCase):
         mock_convert.assert_called_once_with(volume_path,
                                              temp_file,
                                              mock.sentinel.disk_format,
-                                             run_as_root=True)
+                                             run_as_root=True,
+                                             image_id=image_meta['id'],
+                                             data=data)
         mock_info.assert_called_with(temp_file, run_as_root=True)
         self.assertEqual(2, mock_info.call_count)
         self.assertFalse(image_service.update.called)
@@ -920,8 +930,10 @@ class TestFetchToVolumeFormat(test.TestCase):
         out_subformat = None
         blocksize = mock.sentinel.blocksize
 
+        disk_format = 'raw'
+
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = disk_format
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock_temp.return_value.__enter__.return_value
@@ -942,7 +954,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=out_subformat,
                                              run_as_root=True,
-                                             src_format='raw')
+                                             src_format=disk_format,
+                                             image_id=image_id,
+                                             data=data)
 
     @mock.patch('cinder.image.image_utils.check_virtual_size')
     @mock.patch('cinder.image.image_utils.check_available_space')
@@ -960,7 +974,9 @@ class TestFetchToVolumeFormat(test.TestCase):
                     mock_is_xen, mock_repl_xen, mock_copy, mock_convert,
                     mock_check_space, mock_check_size):
         ctxt = mock.sentinel.context
-        image_service = FakeImageService()
+        disk_format = 'ploop'
+        qemu_img_format = image_utils.QEMU_IMG_FORMAT_MAP[disk_format]
+        image_service = FakeImageService(disk_format=disk_format)
         image_id = mock.sentinel.image_id
         dest = mock.sentinel.dest
         volume_format = mock.sentinel.volume_format
@@ -972,7 +988,7 @@ class TestFetchToVolumeFormat(test.TestCase):
         run_as_root = mock.sentinel.run_as_root
 
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = qemu_img_format
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock_temp.return_value.__enter__.return_value
@@ -994,7 +1010,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=out_subformat,
                                              run_as_root=run_as_root,
-                                             src_format='raw')
+                                             src_format=qemu_img_format,
+                                             image_id=image_id,
+                                             data=data)
         mock_check_size.assert_called_once_with(data.virtual_size,
                                                 size, image_id)
 
@@ -1025,12 +1043,14 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
+        disk_format = 'vhd'
+
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = image_utils.QEMU_IMG_FORMAT_MAP[disk_format]
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock_temp.return_value.__enter__.return_value
-        image_service = FakeImageService(disk_format='vhd')
+        image_service = FakeImageService(disk_format=disk_format)
         expect_format = 'vpc'
 
         output = image_utils.fetch_to_volume_format(
@@ -1050,7 +1070,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=out_subformat,
                                              run_as_root=run_as_root,
-                                             src_format=expect_format)
+                                             src_format=expect_format,
+                                             image_id=image_id,
+                                             data=data)
 
     @mock.patch('cinder.image.image_utils.check_virtual_size')
     @mock.patch('cinder.image.image_utils.check_available_space')
@@ -1077,12 +1099,14 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
+        disk_format = 'iso'
+
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = image_utils.QEMU_IMG_FORMAT_MAP[disk_format]
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock_temp.return_value.__enter__.return_value
-        image_service = FakeImageService(disk_format='iso')
+        image_service = FakeImageService(disk_format=disk_format)
         expect_format = 'raw'
 
         output = image_utils.fetch_to_volume_format(
@@ -1101,7 +1125,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=out_subformat,
                                              run_as_root=run_as_root,
-                                             src_format=expect_format)
+                                             src_format=expect_format,
+                                             image_id=image_id,
+                                             data=data)
 
     @mock.patch('cinder.image.image_utils.check_available_space',
                 new=mock.Mock())
@@ -1120,7 +1146,9 @@ class TestFetchToVolumeFormat(test.TestCase):
                               mock_copy, mock_convert):
         ctxt = mock.sentinel.context
         ctxt.user_id = mock.sentinel.user_id
-        image_service = FakeImageService()
+        disk_format = 'ploop'
+        qemu_img_format = image_utils.QEMU_IMG_FORMAT_MAP[disk_format]
+        image_service = FakeImageService(disk_format=disk_format)
         image_id = mock.sentinel.image_id
         dest = mock.sentinel.dest
         volume_format = mock.sentinel.volume_format
@@ -1128,7 +1156,7 @@ class TestFetchToVolumeFormat(test.TestCase):
         blocksize = mock.sentinel.blocksize
 
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = qemu_img_format
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock.sentinel.tmp
@@ -1156,7 +1184,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=out_subformat,
                                              run_as_root=True,
-                                             src_format='raw')
+                                             src_format=qemu_img_format,
+                                             image_id=image_id,
+                                             data=data)
 
     @mock.patch('cinder.image.image_utils.convert_image')
     @mock.patch('cinder.image.image_utils.volume_utils.copy_volume')
@@ -1461,7 +1491,9 @@ class TestFetchToVolumeFormat(test.TestCase):
                               mock_copy, mock_convert, mock_check_space,
                               mock_check_size):
         ctxt = mock.sentinel.context
-        image_service = FakeImageService()
+        disk_format = 'vhd'
+        qemu_img_format = image_utils.QEMU_IMG_FORMAT_MAP[disk_format]
+        image_service = FakeImageService(disk_format=disk_format)
         image_id = mock.sentinel.image_id
         dest = mock.sentinel.dest
         volume_format = mock.sentinel.volume_format
@@ -1472,7 +1504,7 @@ class TestFetchToVolumeFormat(test.TestCase):
         run_as_root = mock.sentinel.run_as_root
 
         data = mock_info.return_value
-        data.file_format = volume_format
+        data.file_format = qemu_img_format
         data.backing_file = None
         data.virtual_size = 1234
         tmp = mock_temp.return_value.__enter__.return_value
@@ -1494,7 +1526,9 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
                                              out_subformat=None,
                                              run_as_root=run_as_root,
-                                             src_format='raw')
+                                             src_format=qemu_img_format,
+                                             image_id=image_id,
+                                             data=data)
 
     @mock.patch('cinder.image.image_utils.fetch')
     @mock.patch('cinder.image.image_utils.qemu_img_info',
@@ -1874,3 +1908,300 @@ class TestImageUtils(test.TestCase):
                     'ivgen_alg': 'essiv'}
         result = image_utils.decode_cipher('aes-xts-essiv', 256)
         self.assertEqual(expected, result)
+
+
+@ddt.ddt
+class TestVmdkImageChecks(test.TestCase):
+    def setUp(self):
+        super(TestVmdkImageChecks, self).setUp()
+        # Test data from:
+        # $ qemu-img create -f vmdk fake.vmdk 1M -o subformat=monolithicSparse
+        # $ qemu-img info -f vmdk --output=json fake.vmdk
+        #
+        # What qemu-img calls the "subformat" is called the "createType" in
+        # vmware-speak and it's found at "/format-specific/data/create-type".
+        qemu_img_info = '''
+        {
+            "virtual-size": 1048576,
+            "filename": "fake.vmdk",
+            "cluster-size": 65536,
+            "format": "vmdk",
+            "actual-size": 12288,
+            "format-specific": {
+                "type": "vmdk",
+                "data": {
+                    "cid": 1200165687,
+                    "parent-cid": 4294967295,
+                    "create-type": "monolithicSparse",
+                    "extents": [
+                        {
+                            "virtual-size": 1048576,
+                            "filename": "fake.vmdk",
+                            "cluster-size": 65536,
+                            "format": ""
+                        }
+                    ]
+                }
+            },
+            "dirty-flag": false
+        }'''
+        self.qdata = imageutils.QemuImgInfo(qemu_img_info, format='json')
+        self.qdata_data = self.qdata.format_specific['data']
+        # we will populate this in each test
+        self.qdata_data["create-type"] = None
+
+    @ddt.data('monolithicSparse', 'streamOptimized')
+    def test_check_vmdk_image_default_config(self, subformat):
+        # none of these should raise
+        self.qdata_data["create-type"] = subformat
+        image_utils.check_vmdk_image(fake.IMAGE_ID, self.qdata)
+
+    @ddt.data('monolithicFlat', 'twoGbMaxExtentFlat')
+    def test_check_vmdk_image_negative_default_config(self, subformat):
+        self.qdata_data["create-type"] = subformat
+        self.assertRaises(exception.ImageUnacceptable,
+                          image_utils.check_vmdk_image,
+                          fake.IMAGE_ID,
+                          self.qdata)
+
+    def test_check_vmdk_image_handles_missing_info(self):
+        expected = 'Unable to determine VMDK createType'
+        # remove create-type
+        del(self.qdata_data['create-type'])
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.check_vmdk_image,
+                                fake.IMAGE_ID,
+                                self.qdata)
+        self.assertIn(expected, str(iue))
+
+        # remove entire data section
+        del(self.qdata_data)
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.check_vmdk_image,
+                                fake.IMAGE_ID,
+                                self.qdata)
+        self.assertIn(expected, str(iue))
+
+        # oslo.utils.imageutils guarantees that format_specific is
+        # defined, so let's see what happens when it's empty
+        self.qdata.format_specific = None
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.check_vmdk_image,
+                                fake.IMAGE_ID,
+                                self.qdata)
+        self.assertIn('no format-specific information is available', str(iue))
+
+    def test_check_vmdk_image_positive(self):
+        allowed = 'twoGbMaxExtentFlat'
+        self.flags(vmdk_allowed_types=['garbage', allowed])
+        self.qdata_data["create-type"] = allowed
+        image_utils.check_vmdk_image(fake.IMAGE_ID, self.qdata)
+
+    @ddt.data('monolithicSparse', 'streamOptimized')
+    def test_check_vmdk_image_negative(self, subformat):
+        allow_list = ['vmfs', 'filler']
+        self.assertNotIn(subformat, allow_list)
+        self.flags(vmdk_allowed_types=allow_list)
+        self.qdata_data["create-type"] = subformat
+        self.assertRaises(exception.ImageUnacceptable,
+                          image_utils.check_vmdk_image,
+                          fake.IMAGE_ID,
+                          self.qdata)
+
+    @ddt.data('monolithicSparse', 'streamOptimized', 'twoGbMaxExtentFlat')
+    def test_check_vmdk_image_negative_empty_list(self, subformat):
+        # anything should raise
+        allow_list = []
+        self.flags(vmdk_allowed_types=allow_list)
+        self.qdata_data["create-type"] = subformat
+        self.assertRaises(exception.ImageUnacceptable,
+                          image_utils.check_vmdk_image,
+                          fake.IMAGE_ID,
+                          self.qdata)
+
+    # OK, now that we know the function works properly, let's make sure
+    # it's called in all the situations where Bug #1996188 indicates that
+    # we need this check
+
+    @mock.patch('cinder.image.image_utils.check_vmdk_image')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    @mock.patch('cinder.image.image_utils.fileutils')
+    @mock.patch('cinder.image.image_utils.fetch')
+    def test_vmdk_subformat_checked_fetch_verify_image(
+            self, mock_fetch, mock_fileutils, mock_info, mock_check):
+        ctxt = mock.sentinel.context
+        image_service = mock.Mock()
+        image_id = mock.sentinel.image_id
+        dest = mock.sentinel.dest
+        mock_info.return_value = self.qdata
+        mock_check.side_effect = exception.ImageUnacceptable(
+            image_id=image_id, reason='mock check')
+
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.fetch_verify_image,
+                                ctxt, image_service, image_id, dest)
+        self.assertIn('mock check', str(iue))
+        mock_check.assert_called_with(image_id, self.qdata)
+
+    @mock.patch('cinder.image.image_utils.check_vmdk_image')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    @mock.patch('cinder.image.image_utils.get_qemu_data')
+    def test_vmdk_subformat_checked_fetch_to_volume_format(
+            self, mock_qdata, mock_info, mock_check):
+        ctxt = mock.sentinel.context
+        image_service = mock.Mock()
+        image_meta = {'disk_format': 'vmdk'}
+        image_service.show.return_value = image_meta
+        image_id = mock.sentinel.image_id
+        dest = mock.sentinel.dest
+        volume_format = mock.sentinel.volume_format
+        blocksize = 1024
+        #self.flags(allow_compression_on_image_upload=False)
+        mock_qdata.return_value = self.qdata
+        mock_info.return_value = self.qdata
+        mock_check.side_effect = exception.ImageUnacceptable(
+            image_id=image_id, reason='mock check')
+
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.fetch_to_volume_format,
+                                ctxt,
+                                image_service,
+                                image_id,
+                                dest,
+                                volume_format,
+                                blocksize)
+        self.assertIn('mock check', str(iue))
+        mock_check.assert_called_with(image_id, self.qdata)
+
+    @mock.patch('cinder.image.image_utils.check_vmdk_image')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_vmdk_subformat_checked_upload_volume(
+            self, mock_info, mock_check):
+        ctxt = mock.sentinel.context
+        image_service = mock.Mock()
+        image_meta = {'disk_format': 'vmdk'}
+        image_id = mock.sentinel.image_id
+        image_meta['id'] = image_id
+        #self.flags(allow_compression_on_image_upload=False)
+        mock_info.return_value = self.qdata
+        mock_check.side_effect = exception.ImageUnacceptable(
+            image_id=image_id, reason='mock check')
+
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.upload_volume,
+                                ctxt,
+                                image_service,
+                                image_meta,
+                                volume_path=mock.sentinel.volume_path,
+                                volume_format=mock.sentinel.volume_format)
+        self.assertIn('mock check', str(iue))
+        mock_check.assert_called_with(image_id, self.qdata)
+
+    @mock.patch('cinder.image.image_utils.check_vmdk_image')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_vmdk_checked_convert_image_no_src_format(
+            self, mock_info, mock_check):
+        source = mock.sentinel.source
+        dest = mock.sentinel.dest
+        out_format = mock.sentinel.out_format
+        mock_info.return_value = self.qdata
+        image_id = 'internal image'
+        mock_check.side_effect = exception.ImageUnacceptable(
+            image_id=image_id, reason='mock check')
+
+        self.assertRaises(exception.ImageUnacceptable,
+                          image_utils.convert_image,
+                          source, dest, out_format)
+        mock_check.assert_called_with(image_id, self.qdata)
+
+
+@ddt.ddt
+class TestImageFormatCheck(test.TestCase):
+    def setUp(self):
+        super(TestImageFormatCheck, self).setUp()
+        qemu_img_info = '''
+        {
+            "virtual-size": 1048576,
+            "filename": "whatever.img",
+            "cluster-size": 65536,
+            "format": "qcow2",
+            "actual-size": 200704,
+            "format-specific": {
+                "type": "qcow2",
+                "data": {
+                    "compat": "1.1",
+                    "compression-type": "zlib",
+                    "lazy-refcounts": false,
+                    "refcount-bits": 16,
+                    "corrupt": false,
+                    "extended-l2": false
+                }
+            },
+            "dirty-flag": false
+        }'''
+        self.qdata = imageutils.QemuImgInfo(qemu_img_info, format='json')
+
+    @mock.patch('cinder.image.image_utils.check_vmdk_image')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_check_image_format_defaults(self, mock_info, mock_vmdk):
+        """Doesn't blow up when only the mandatory arg is passed."""
+        src = mock.sentinel.src
+        mock_info.return_value = self.qdata
+        expected_image_id = 'internal image'
+
+        # empty file_format should raise
+        self.qdata.file_format = None
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.check_image_format,
+                                src)
+        self.assertIn(expected_image_id, str(iue))
+        mock_info.assert_called_with(src, run_as_root=True)
+
+        # a VMDK should trigger an additional check
+        mock_info.reset_mock()
+        self.qdata.file_format = 'vmdk'
+        image_utils.check_image_format(src)
+        mock_vmdk.assert_called_with(expected_image_id, self.qdata)
+
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_check_image_format_uses_passed_data(self, mock_info):
+        src = mock.sentinel.src
+        image_utils.check_image_format(src, data=self.qdata)
+        mock_info.assert_not_called()
+
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_check_image_format_mismatch(self, mock_info):
+        src = mock.sentinel.src
+        mock_info.return_value = self.qdata
+        self.qdata.file_format = 'fake_format'
+
+        src_format = 'qcow2'
+        iue = self.assertRaises(exception.ImageUnacceptable,
+                                image_utils.check_image_format,
+                                src,
+                                src_format=src_format)
+        self.assertIn(src_format, str(iue))
+        self.assertIn('different format', str(iue))
+
+    @ddt.data('AMI', 'ami')
+    @mock.patch('cinder.image.image_utils.qemu_img_info')
+    def test_check_image_format_AMI(self, ami, mock_info):
+        """Mismatch OK in this case, see change Icde4c0f936ce."""
+        src = mock.sentinel.src
+        mock_info.return_value = self.qdata
+        self.qdata.file_format = 'raw'
+
+        src_format = ami
+        image_utils.check_image_format(src, src_format=src_format)
+
+    @mock.patch('cinder.image.image_utils._convert_image')
+    @mock.patch('cinder.image.image_utils.check_image_format')
+    def test_check_image_format_called_by_convert_image(
+            self, mock_check, mock__convert):
+        """Make sure the function we've been testing is actually called."""
+        src = mock.sentinel.src
+        dest = mock.sentinel.dest
+        out_fmt = mock.sentinel.out_fmt
+
+        image_utils.convert_image(src, dest, out_fmt)
+        mock_check.assert_called_once_with(src, None, None, None, True)
diff --git a/releasenotes/notes/bug-1996188-vmdk-subformat-allow-list-93e6943d9a486d11.yaml b/releasenotes/notes/bug-1996188-vmdk-subformat-allow-list-93e6943d9a486d11.yaml
new file mode 100644
index 0000000..9b32921
--- /dev/null
+++ b/releasenotes/notes/bug-1996188-vmdk-subformat-allow-list-93e6943d9a486d11.yaml
@@ -0,0 +1,33 @@
+---
+upgrade:
+  - |
+    This release introduces a new configuration option,
+    ``vmdk_allowed_types``, that specifies the list of VMDK image
+    subformats that Cinder will allow.  The default setting allows
+    only the 'streamOptimized' and 'monolithicSparse' subformats,
+    which do not use named extents.
+security:
+  - |
+    This release introduces a new configuration option,
+    ``vmdk_allowed_types``, that specifies the list of VMDK image
+    subformats that Cinder will allow in order to prevent exposure
+    of host information by modifying the named extents in a VMDK
+    image.  The default setting allows only the 'streamOptimized'
+    and 'monolithicSparse' subformats, which do not use named extents.
+  - |
+    As part of the fix for `Bug #1996188
+    <https://bugs.launchpad.net/cinder/+bug/1996188>`_, cinder is now more
+    strict in checking that the ``disk_format`` recorded for an image (as
+    revealed by the Image Service API image-show response) matches what
+    cinder detects when it downloads the image.  Thus, some requests to
+    create a volume from a source image that had previously succeeded may
+    fail with an ``ImageUnacceptable`` error.
+fixes:
+  - |
+    `Bug #1996188 <https://bugs.launchpad.net/cinder/+bug/1996188>`_:
+    Fixed issue where a VMDK image file whose createType allowed named
+    extents could expose host information.  This change introduces a new
+    configuration option, ``vmdk_allowed_types``, that specifies the list
+    of VMDK image subformats that Cinder will allow.  The default
+    setting allows only the 'streamOptimized' and 'monolithicSparse'
+    subformats.
-- 
2.7.4

openSUSE Build Service is sponsored by