File 0001-pyVmomi-pinned-certificates-support.patch of Package python-pyvmomi

From 9a8956f7b4a91b491e63454b3eb3c59d4abb8a31 Mon Sep 17 00:00:00 2001
From: ddraganov <ddraganov@vmware.com>
Date: Wed, 31 Jul 2024 10:56:55 +0300
Subject: [PATCH] pyVmomi pinned certificates support

New:
SoapStubAdapter and the connect.py wrappers now allows passing a serverPemCert parameter.
serverPemCert is an ASCII string of a PEM-encoded SSL certificate of the host to which a connection is attempted. A replacement of thumbprint. If both fields are set, thumbprint should match serverPemCert.
CertificateMismatchException is thrown when there's a mismatch.

If the standard SSL verifications fails
	if serverPemCert or thumbprint is provided try to connect with an unverified connection and try to match the peer certificate
	else fail

pyVmomi now has a single point of establishing a server connection SoapAdapter._Connect()

Breaking changes in SoapAdapter.py:
	HTTPProxyConnection is removed because it is unnecessary as the connection logic is now streamlined
	SSLTunnelConnection is removed and replaced by _SSLTunnelConnection which inherits Python's standard HTTPSConnection.
	UnixSocketConnection is removed and replaced by _UnixSocketConnection which inherits Python's standard HTTPConnection.
---
 pyVim/connect.py       |  41 +++++++-
 pyVmomi/Security.py    |  38 ++++++++
 pyVmomi/SoapAdapter.py | 214 +++++++++++++++++++++++++----------------
 3 files changed, 207 insertions(+), 86 deletions(-)

diff --git a/pyVim/connect.py b/pyVim/connect.py
index 0609040..437065c 100644
--- a/pyVim/connect.py
+++ b/pyVim/connect.py
@@ -211,6 +211,7 @@ def Connect(host='localhost',
             httpProxyHost=None,
             httpProxyPort=80,
             thumbprint=None,
+            serverPemCert=None,
             sslContext=None,
             httpConnectionTimeout=None,
             connectionPoolTimeout=CONNECTION_POOL_IDLE_TIMEOUT_SEC,
@@ -260,8 +261,17 @@ def Connect(host='localhost',
     @type  httpProxyHost: string
     @param httpProxyPort The proxy server port.
     @type  httpProxyPort: int
-    @param thumbprint: host cert thumbprint
+    @param thumbprint: **** Deprecated. Use serverPemCert instead.
+                        If both fields are set, thumbprint should match
+                        serverPemCert.
+                        The SHA1/SHA256/SHA512 thumbprint of the server's
+                        SSL certificate.
+                        Some use a thumbprint of the form xx:xx:xx..:xx.
+                        We ignore the ":" characters.
     @type  thumbprint: string
+    @param serverPemCert: PEM-encoded SSL certificate of the
+                            host to which we are connecting.
+    @type  serverPemCert: string
     @param sslContext: SSL Context describing the various SSL options. It is only
                         supported in Python 2.7.9 or higher.
     @type  sslContext: SSL.Context
@@ -324,6 +334,7 @@ def Connect(host='localhost',
                        httpProxyHost,
                        httpProxyPort,
                        thumbprint,
+                       serverPemCert,
                        sslContext,
                        httpConnectionTimeout,
                        connectionPoolTimeout,
@@ -395,6 +406,7 @@ def __Login(host,
             httpProxyHost,
             httpProxyPort,
             thumbprint,
+            serverPemCert,
             sslContext,
             httpConnectionTimeout,
             connectionPoolTimeout,
@@ -430,8 +442,17 @@ def __Login(host,
     @type  httpProxyHost: string
     @param httpProxyPort The proxy server port.
     @type  httpProxyPort: int
-    @param thumbprint: host cert thumbprint
+    @param thumbprint: **** Deprecated. Use serverPemCert instead.
+                        If both fields are set, thumbprint should match
+                        serverPemCert.
+                        The SHA1/SHA256/SHA512 thumbprint of the server's
+                        SSL certificate.
+                        Some use a thumbprint of the form xx:xx:xx..:xx.
+                        We ignore the ":" characters.
     @type  thumbprint: string
+    @param serverPemCert: PEM-encoded SSL certificate of the
+                            host to which we are connecting.
+    @type  serverPemCert: string
     @param sslContext: SSL Context describing the various SSL options. It is only
                         supported in Python 2.7.9 or higher.
     @type  sslContext: SSL.Context
@@ -479,6 +500,7 @@ def __Login(host,
         httpProxyHost=httpProxyHost,
         httpProxyPort=httpProxyPort,
         thumbprint=thumbprint,
+        serverPemCert=serverPemCert,
         sslContext=sslContext,
         httpConnectionTimeout=httpConnectionTimeout,
         connectionPoolTimeout=connectionPoolTimeout,
@@ -814,6 +836,7 @@ def SmartStubAdapter(host='localhost',
                      httpProxyPort=80,
                      sslProxyPath=None,
                      thumbprint=None,
+                     serverPemCert=None,
                      cacertsFile=None,
                      preferredApiVersions=None,
                      acceptCompressedResponses=True,
@@ -872,6 +895,7 @@ def SmartStubAdapter(host='localhost',
                            httpProxyPort=httpProxyPort,
                            sslProxyPath=sslProxyPath,
                            thumbprint=thumbprint,
+                           serverPemCert=serverPemCert,
                            cacertsFile=cacertsFile,
                            version=supportedVersion,
                            acceptCompressedResponses=acceptCompressedResponses,
@@ -896,6 +920,7 @@ def SmartConnect(protocol='https',
                  httpProxyHost=None,
                  httpProxyPort=80,
                  thumbprint=None,
+                 serverPemCert=None,
                  sslContext=None,
                  httpConnectionTimeout=None,
                  connectionPoolTimeout=CONNECTION_POOL_IDLE_TIMEOUT_SEC,
@@ -948,8 +973,17 @@ def SmartConnect(protocol='https',
     @type  httpProxyHost: string
     @param httpProxyPort The proxy server port.
     @type  httpProxyPort: int
-    @param thumbprint: host cert thumbprint
+    @param thumbprint: **** Deprecated. Use serverPemCert instead.
+                        If both fields are set, thumbprint should match
+                        serverPemCert.
+                        The SHA1/SHA256/SHA512 thumbprint of the server's
+                        SSL certificate.
+                        Some use a thumbprint of the form xx:xx:xx..:xx.
+                        We ignore the ":" characters.
     @type  thumbprint: string
+    @param serverPemCert: PEM-encoded SSL certificate of the
+                            host to which we are connecting.
+    @type  serverPemCert: string
     @param sslContext: SSL Context describing the various SSL options. It is only
                         supported in Python 2.7.9 or higher.
     @type  sslContext: SSL.Context
@@ -1005,6 +1039,7 @@ def SmartConnect(protocol='https',
                    httpProxyHost=httpProxyHost,
                    httpProxyPort=httpProxyPort,
                    thumbprint=thumbprint,
+                   serverPemCert=serverPemCert,
                    sslContext=sslContext,
                    httpConnectionTimeout=httpConnectionTimeout,
                    connectionPoolTimeout=connectionPoolTimeout,
diff --git a/pyVmomi/Security.py b/pyVmomi/Security.py
index 15309ce..a0e69fc 100644
--- a/pyVmomi/Security.py
+++ b/pyVmomi/Security.py
@@ -4,6 +4,7 @@
 # Client security module.
 
 import hashlib
+import ssl
 
 _isSha1Enabled = True
 _isSha256Enabled = True
@@ -25,6 +26,17 @@ def SetSha512Enabled(state):
     _isSha512Enabled = state
 
 
+"""
+Verify that a thumbprint matches a certificate
+
+:param derCert: DER-encoded SSL certificate
+:type derCert: str
+:param thumbprint: SHA1/SHA256/SHA512 thumbprint
+                   of an SSL certificate
+:type thumbprint: str
+:returns: None
+:raises ThumbprintMismatchException
+"""
 def VerifyCertThumbprint(derCert, thumbprint):
     thumbprint_len = len(thumbprint)
     if thumbprint_len == 40 and _isSha1Enabled:
@@ -49,3 +61,29 @@ class ThumbprintMismatchException(Exception):
 
         self.expected = expected
         self.actual = actual
+
+
+"""
+Verify that two PEM certificates match
+
+:param actualCert: PEM-encoded SSL certificate
+:type actualCert: str
+:param expectedCert: PEM-encoded SSL certificate
+:type actualCert: str
+:returns: None
+:raises CertificateMismatchException
+"""
+def VerifyCert(actualCert, expectedCert):
+    actualCert = actualCert.strip()
+    expectedCert = expectedCert.strip()
+    if actualCert != expectedCert:
+        raise CertificateMismatchException(expectedCert, actualCert)
+
+
+class CertificateMismatchException(Exception):
+    def __init__(self, expected, actual):
+        Exception.__init__(self, "Certificate mismatch. Expected: \n{0}, "
+                                 "actual: \n{1}".format(expected, actual))
+
+        self.expected = expected
+        self.actual = actual
diff --git a/pyVmomi/SoapAdapter.py b/pyVmomi/SoapAdapter.py
index 64835d5..22c81f6 100644
--- a/pyVmomi/SoapAdapter.py
+++ b/pyVmomi/SoapAdapter.py
@@ -34,7 +34,7 @@ from .VmomiSupport import (
     GetWsdlName, GetWsdlNamespace, GetWsdlType, GuessWsdlMethod, GuessWsdlType,
     IsChildVersion, ManagedMethod, UnknownManagedMethod, ManagedObject,
     Object, PropertyPath, Type, binary, versionIdMap, versionMap)
-from .Security import VerifyCertThumbprint
+from .Security import VerifyCert, VerifyCertThumbprint
 from .Version import kind
 from . import version_info_str
 
@@ -1054,18 +1054,19 @@ class SoapStubAdapterBase(StubAdapterBase):
 
 
 # Subclass of HTTPConnection that connects over a Unix domain socket
-# instead of a TCP port.  The path of the socket is passed in place of
-# the hostname.  Fairly gross but does the job.
-class UnixSocketConnection(HTTPConnection):
-    # The HTTPConnection ctor expects a single argument, which it interprets
-    # as the host to connect to; for UnixSocketConnection, we instead interpret
+# instead of a TCP port. The path of the socket is passed in place of
+# the hostname. Fairly gross but does the job.
+class _UnixSocketConnection(HTTPConnection):
+    # The HTTPConnection constructor expects a single argument, which it interprets
+    # as the host to connect to; for _UnixSocketConnection, we instead interpret
     # the parameter as the filesystem path of the Unix domain socket.
-    def __init__(self, path):
+    def __init__(self, host, **kwargs):
         # Pass '' as the host to HTTPConnection; it doesn't really matter
         # what we pass (since we've overridden the connect method) as long
         # as it's a valid string.
-        HTTPConnection.__init__(self, '')
-        self.path = path
+        # kwargs allows to pass all other HTTPConnection constructor arguments
+        self.path = host
+        HTTPConnection.__init__(self, '', **kwargs)
 
     def connect(self):
         # Hijack the connect method of HTTPConnection to connect to the
@@ -1076,76 +1077,90 @@ class UnixSocketConnection(HTTPConnection):
         self.sock = sock
 
 
-def _VerifyThumbprint(thumbprint, connection):
-    """If there is a thumbprint, connect to the server and verify that the
-    SSL certificate matches the given thumbprint.  An exception is thrown
-    if there is a mismatch.
+def _VerifyPinnedIdentity(connection, certificate, thumbprint):
     """
-    if thumbprint and isinstance(connection, HTTPSConnection):
-        if not connection.sock:
-            connection.connect()
-        derCert = connection.sock.getpeercert(True)
+    Verify that the server connection SSL certificate
+    matches the given certificate or thumbprint.
+
+    :param connection: Server connection
+    :type connection: HTTPSConnection
+    :param certificate: PEM-encoded SSL certificate of the server
+    :type certificate: str
+    :param thumbprint: SHA1/SHA256/SHA512 thumbprint
+                       of an SSL certificate
+    :type thumbprint: str
+    :returns: None
+    :raises ThumbprintMismatchException or CertificateMismatchException
+    """
+    derCert = connection.sock.getpeercert(True)
+    if certificate:
+        pemCert = ssl.DER_cert_to_PEM_cert(derCert)
+        VerifyCert(pemCert, certificate)
+    elif thumbprint:
         VerifyCertThumbprint(derCert, thumbprint)
 
 
-# Stand-in for the HTTPSConnection class that will connect to a regular HTTP
-# proxy.
-class HTTPProxyConnection(object):
-    # @param proxyPath The path to pass to the CONNECT command.
-    # @param customHeaders Dictionary with custom HTTP headers.
-    def __init__(self, proxyPath, customHeaders=None):
-        self.proxyPath = proxyPath
-        self.customHeaders = customHeaders if customHeaders else {}
 
-    # Connects to an HTTP proxy server and initiates a tunnel to the destination
-    # specified by self.proxyPath.
-    #
-    # @param addr Address in the form of host:port
-    # @param port If no port number is passed,
-    #             the port is extracted from the addr string
-    # @param timeout Connection timeout in seconds
-    # @param context SSL Context with the desired SSL options
-    # @return HTTPSConnection to the destination
-    def __call__(self, addr, port, timeout, context):
-        conn = HTTPSConnection(host=addr, port=port,
-                               timeout=timeout, context=context)
-        conn.set_tunnel(self.proxyPath, headers=self.customHeaders)
-        return conn
+def _Connect(connection, serverPemCert=None, thumbprint=None):
+    """
+    Connect to the server specified when the connection object was created.
+
+    serverPemCert and thumbprint denote a pre-defined pinned
+    certificate/thumbprint which has been trusted by the user.
+    Whenever provided if that certificate/thumbprint of the peer exactly
+    matches the pinned certificate/thumbprint, then the connection is established.
+
+    :param connection: Server connection
+    :type connection: HTTPConnection
+    :param serverPemCert: PEM-encoded SSL certificate of the server
+    :type serverPemCert: str
+    :param thumbprint: SHA1/SHA256/SHA512 thumbprint
+                       of an SSL certificate
+    :type thumbprint: str
+    :returns: HTTPConnection
+    """
+    try:
+        connection.connect()
+    except ssl.SSLCertVerificationError as ex:
+        if serverPemCert or thumbprint:
+            connection._context.check_hostname = False
+            connection._context.verify_mode = ssl.CERT_NONE
+            connection.connect()
+            _VerifyPinnedIdentity(connection, serverPemCert, thumbprint)
+        else:
+            raise ex
+    return connection
 
 
-# Stand-in for the HTTPSConnection class that will connect to a proxy and
-# issue a CONNECT command to start an SSL tunnel.
-class SSLTunnelConnection(HTTPProxyConnection):
-    # Connects to a proxy server and initiates a tunnel to the destination
-    # specified by self.proxyPath.
-    # For Python Version < 2.7.9. cert_reqs=CERT_OPTIONAL to verify
-    # server certificate
-    #
-    # @param addr Address in the form of host:port
-    # @param port If no port number is passed,
-    #             the port is extracted from the addr string
-    # @param timeout Connection timeout in seconds
-    # @param context SSL Context with the desired SSL options
-    # @return HTTPSConnection to the destination
-    def __call__(self, addr, port=None, timeout=None, context=None):
-        tunnelConn = HTTPConnection(host=addr, port=port, timeout=timeout)
-        tunnelConn.request('CONNECT', self.proxyPath)
+# A subclass of HTTPConnection that uses SSL through an HTTP proxy tunnel
+class _SSLTunnelConnection(HTTPSConnection):
+
+    def connect(self):
+        tunnelConn = HTTPConnection(host=self.host,
+                                    port=self.port,
+                                    timeout=self.timeout)
+        tunnelConn.request('CONNECT', self._proxyPath)
         resp = tunnelConn.getresponse()
         if resp.status != 200:
             raise HTTPException(
                 "{0} {1}".format(resp.status, resp.reason))
 
-        conn = HTTPSConnection(host=tunnelConn.host,
-                               port=tunnelConn.port,
-                               context=context,
-                               timeout=timeout)
-        if conn.host in ('localhost', '127.0.0.1', '::1'):
-            conn._context.check_hostname = False
-            conn._context.verify_mode = ssl.CERT_NONE
+        if self.host in ('localhost', '127.0.0.1', '::1'):
+            self._context.check_hostname = False
+            self._context.verify_mode = ssl.CERT_NONE
 
-        conn.sock = conn._context.wrap_socket(sock=tunnelConn.sock,
+        self.sock = self._context.wrap_socket(sock=tunnelConn.sock,
                                               server_hostname=tunnelConn.host)
-        return conn
+
+    def setVcTunnel(self, proxyPath):
+        """
+        Set the path to use when tunneling through VC's reverse proxy
+        ex: /sdkTunnel
+
+        :param proxyPath: Tunnel path
+        :type proxyPath: str
+        """
+        self._proxyPath = proxyPath
 
 
 class GzipReader:
@@ -1244,10 +1259,21 @@ class SoapStubAdapter(SoapStubAdapterBase):
     # @param httpProxyHost The host name of the proxy server.
     # @param httpProxyPort The proxy server port.
     # @param sslProxyPath Path to use when tunneling through VC's reverse proxy
-    # @param thumbprint The SHA1/SHA256/SHA512 thumbprint of the server's
+    # @param thumbprint **** Deprecated. Use serverPemCert instead.
+    #                   If both fields are set, thumbprint should match
+    #                   serverPemCert.
+    #                   The SHA1/SHA256/SHA512 thumbprint of the server's
     #                   SSL certificate.
-    #   Some use a thumbprint of the form xx:xx:xx..:xx.  We ignore the ":"
-    #   characters.  If set to None, any thumbprint is accepted.
+    #                   Whenever provided if that thumbprint of the peer's
+    #                   certificate exactly matches the pinned thumbprint
+    #                   the connection is established.
+    #                   Some use a thumbprint of the form xx:xx:xx..:xx.
+    #                   We ignore the ":" characters.
+    # @param serverPemCert PEM-encoded SSL certificate of the
+    #                       host to which we are connecting.
+    #                       Whenever provided if that certificate of the peer
+    #                       exactly matches the pinned certificate
+    #                       the connection is established.
     # @param cacertsFile **** Deprecated. Please load cert to context and pass
     #                    context instread ****
     #                    sslContext.load_verify_locations(cafile=ca_cert_file)
@@ -1278,6 +1304,7 @@ class SoapStubAdapter(SoapStubAdapterBase):
                  httpProxyPort=80,
                  sslProxyPath=None,
                  thumbprint=None,
+                 serverPemCert=None,
                  cacertsFile=None,
                  version=None,
                  acceptCompressedResponses=True,
@@ -1297,12 +1324,16 @@ class SoapStubAdapter(SoapStubAdapterBase):
             version = 'vim.version.version9'
         SoapStubAdapterBase.__init__(self, version=version, sessionId=sessionId)
         if sock:
-            self.scheme = UnixSocketConnection
+            self.scheme = _UnixSocketConnection
             # Store sock in the host member variable because that's where
-            # the UnixSocketConnection ctor expects to find it -- see above
+            # the _UnixSocketConnection ctor expects to find it -- see above
             self.host = sock
         elif url:
-            url_scheme_specifier, self.host, urlpath = urlparse(url)[:3]
+            parse_result = urlparse(url)
+            url_scheme_specifier = parse_result.scheme
+            self.host = parse_result.netloc
+            port = parse_result.port
+            urlpath = parse_result.path
             # Only use the URL path if it's sensible, otherwise use the path
             # keyword argument as passed in.
             if urlpath not in ('', '/'):
@@ -1317,8 +1348,10 @@ class SoapStubAdapter(SoapStubAdapterBase):
             if host.find(':') != -1 and host[0] != '[':  # is IPv6?
                 host = '[' + host + ']'
             self.host = '{0}:{1}'.format(host, port)
+        self.port = port
 
         self.path = path
+        self.serverPemCert = serverPemCert
         if thumbprint:
             self.thumbprint = thumbprint.replace(":", "").lower()
             if len(self.thumbprint) not in (40, 64, 128):
@@ -1329,18 +1362,17 @@ class SoapStubAdapter(SoapStubAdapterBase):
 
         self.is_tunnel = False
         if sslProxyPath:
-            self.scheme = SSLTunnelConnection(sslProxyPath, customHeaders)
+            self.sslProxyPath = sslProxyPath
+            self.scheme = _SSLTunnelConnection
             self.is_tunnel = True
         elif httpProxyHost:
-            self.scheme = HTTPProxyConnection(self.host, customHeaders)
-            self.is_tunnel = True
-
             # Is httpProxyHost IPv6
             if httpProxyHost.find(':') != -1 and httpProxyHost[0] != '[':
-                httpProxyHost = '[' + httpProxyHost + ']'
-
-            # Swap the actual host with the proxy.
-            self.host = "{0}:{1}".format(httpProxyHost, httpProxyPort)
+                self.httpProxyHost = '[' + httpProxyHost + ']'
+            else:
+                self.httpProxyHost = httpProxyHost
+            self.httpProxyPort = httpProxyPort
+            self.is_tunnel = True
         self.poolSize = poolSize
         self.pool = []
         self.connectionPoolTimeout = connectionPoolTimeout
@@ -1499,15 +1531,31 @@ class SoapStubAdapter(SoapStubAdapterBase):
         self.lock.acquire()
         self._CloseIdleConnections()
         if self.pool:
-            result, _ = self.pool.pop(0)
+            conn, _ = self.pool.pop(0)
             self.lock.release()
         else:
             self.lock.release()
-            result = self.scheme(self.host, **self.schemeArgs)
 
-            _VerifyThumbprint(self.thumbprint, result)
+            # Python fails if both host:port pair
+            # and port are used for HTTPConnection
+            host = getattr(self, 'httpProxyHost', self.host.rsplit(":", 1)[0])
+            port = getattr(self, 'httpProxyPort', self.port)
+
+            # Fix for gh-100985 which is fixed
+            # in Python 3.11.9 and Python 3.12.4
+            if host and host[0] == '[' and host[-1] == ']':
+               host = host[1:-1]
+
+            conn = self.scheme(host=host, port=port, **self.schemeArgs)
+            if self.is_tunnel:
+                if hasattr(self, 'sslProxyPath'):
+                    conn.setVcTunnel(self.sslProxyPath)
+                elif hasattr(self, 'httpProxyHost'):
+                    customHeaders = self._customHeaders if self._customHeaders else {}
+                    conn.set_tunnel(host, port, customHeaders)
+            _Connect(connection=conn, serverPemCert=self.serverPemCert, thumbprint=self.thumbprint)
 
-        return result
+        return conn
 
     # Drop all cached connections to the server.
     def DropConnections(self):
-- 
2.46.0

openSUSE Build Service is sponsored by