File 0001-pyVmomi-pinned-certificates-support.patch of Package python-pyvmomi
From 9a8956f7b4a91b491e63454b3eb3c59d4abb8a31 Mon Sep 17 00:00:00 2001
From: ddraganov <ddraganov@vmware.com>
Date: Wed, 31 Jul 2024 10:56:55 +0300
Subject: [PATCH] pyVmomi pinned certificates support
New:
SoapStubAdapter and the connect.py wrappers now allows passing a serverPemCert parameter.
serverPemCert is an ASCII string of a PEM-encoded SSL certificate of the host to which a connection is attempted. A replacement of thumbprint. If both fields are set, thumbprint should match serverPemCert.
CertificateMismatchException is thrown when there's a mismatch.
If the standard SSL verifications fails
if serverPemCert or thumbprint is provided try to connect with an unverified connection and try to match the peer certificate
else fail
pyVmomi now has a single point of establishing a server connection SoapAdapter._Connect()
Breaking changes in SoapAdapter.py:
HTTPProxyConnection is removed because it is unnecessary as the connection logic is now streamlined
SSLTunnelConnection is removed and replaced by _SSLTunnelConnection which inherits Python's standard HTTPSConnection.
UnixSocketConnection is removed and replaced by _UnixSocketConnection which inherits Python's standard HTTPConnection.
---
pyVim/connect.py | 41 +++++++-
pyVmomi/Security.py | 38 ++++++++
pyVmomi/SoapAdapter.py | 214 +++++++++++++++++++++++++----------------
3 files changed, 207 insertions(+), 86 deletions(-)
diff --git a/pyVim/connect.py b/pyVim/connect.py
index 0609040..437065c 100644
--- a/pyVim/connect.py
+++ b/pyVim/connect.py
@@ -211,6 +211,7 @@ def Connect(host='localhost',
httpProxyHost=None,
httpProxyPort=80,
thumbprint=None,
+ serverPemCert=None,
sslContext=None,
httpConnectionTimeout=None,
connectionPoolTimeout=CONNECTION_POOL_IDLE_TIMEOUT_SEC,
@@ -260,8 +261,17 @@ def Connect(host='localhost',
@type httpProxyHost: string
@param httpProxyPort The proxy server port.
@type httpProxyPort: int
- @param thumbprint: host cert thumbprint
+ @param thumbprint: **** Deprecated. Use serverPemCert instead.
+ If both fields are set, thumbprint should match
+ serverPemCert.
+ The SHA1/SHA256/SHA512 thumbprint of the server's
+ SSL certificate.
+ Some use a thumbprint of the form xx:xx:xx..:xx.
+ We ignore the ":" characters.
@type thumbprint: string
+ @param serverPemCert: PEM-encoded SSL certificate of the
+ host to which we are connecting.
+ @type serverPemCert: string
@param sslContext: SSL Context describing the various SSL options. It is only
supported in Python 2.7.9 or higher.
@type sslContext: SSL.Context
@@ -324,6 +334,7 @@ def Connect(host='localhost',
httpProxyHost,
httpProxyPort,
thumbprint,
+ serverPemCert,
sslContext,
httpConnectionTimeout,
connectionPoolTimeout,
@@ -395,6 +406,7 @@ def __Login(host,
httpProxyHost,
httpProxyPort,
thumbprint,
+ serverPemCert,
sslContext,
httpConnectionTimeout,
connectionPoolTimeout,
@@ -430,8 +442,17 @@ def __Login(host,
@type httpProxyHost: string
@param httpProxyPort The proxy server port.
@type httpProxyPort: int
- @param thumbprint: host cert thumbprint
+ @param thumbprint: **** Deprecated. Use serverPemCert instead.
+ If both fields are set, thumbprint should match
+ serverPemCert.
+ The SHA1/SHA256/SHA512 thumbprint of the server's
+ SSL certificate.
+ Some use a thumbprint of the form xx:xx:xx..:xx.
+ We ignore the ":" characters.
@type thumbprint: string
+ @param serverPemCert: PEM-encoded SSL certificate of the
+ host to which we are connecting.
+ @type serverPemCert: string
@param sslContext: SSL Context describing the various SSL options. It is only
supported in Python 2.7.9 or higher.
@type sslContext: SSL.Context
@@ -479,6 +500,7 @@ def __Login(host,
httpProxyHost=httpProxyHost,
httpProxyPort=httpProxyPort,
thumbprint=thumbprint,
+ serverPemCert=serverPemCert,
sslContext=sslContext,
httpConnectionTimeout=httpConnectionTimeout,
connectionPoolTimeout=connectionPoolTimeout,
@@ -814,6 +836,7 @@ def SmartStubAdapter(host='localhost',
httpProxyPort=80,
sslProxyPath=None,
thumbprint=None,
+ serverPemCert=None,
cacertsFile=None,
preferredApiVersions=None,
acceptCompressedResponses=True,
@@ -872,6 +895,7 @@ def SmartStubAdapter(host='localhost',
httpProxyPort=httpProxyPort,
sslProxyPath=sslProxyPath,
thumbprint=thumbprint,
+ serverPemCert=serverPemCert,
cacertsFile=cacertsFile,
version=supportedVersion,
acceptCompressedResponses=acceptCompressedResponses,
@@ -896,6 +920,7 @@ def SmartConnect(protocol='https',
httpProxyHost=None,
httpProxyPort=80,
thumbprint=None,
+ serverPemCert=None,
sslContext=None,
httpConnectionTimeout=None,
connectionPoolTimeout=CONNECTION_POOL_IDLE_TIMEOUT_SEC,
@@ -948,8 +973,17 @@ def SmartConnect(protocol='https',
@type httpProxyHost: string
@param httpProxyPort The proxy server port.
@type httpProxyPort: int
- @param thumbprint: host cert thumbprint
+ @param thumbprint: **** Deprecated. Use serverPemCert instead.
+ If both fields are set, thumbprint should match
+ serverPemCert.
+ The SHA1/SHA256/SHA512 thumbprint of the server's
+ SSL certificate.
+ Some use a thumbprint of the form xx:xx:xx..:xx.
+ We ignore the ":" characters.
@type thumbprint: string
+ @param serverPemCert: PEM-encoded SSL certificate of the
+ host to which we are connecting.
+ @type serverPemCert: string
@param sslContext: SSL Context describing the various SSL options. It is only
supported in Python 2.7.9 or higher.
@type sslContext: SSL.Context
@@ -1005,6 +1039,7 @@ def SmartConnect(protocol='https',
httpProxyHost=httpProxyHost,
httpProxyPort=httpProxyPort,
thumbprint=thumbprint,
+ serverPemCert=serverPemCert,
sslContext=sslContext,
httpConnectionTimeout=httpConnectionTimeout,
connectionPoolTimeout=connectionPoolTimeout,
diff --git a/pyVmomi/Security.py b/pyVmomi/Security.py
index 15309ce..a0e69fc 100644
--- a/pyVmomi/Security.py
+++ b/pyVmomi/Security.py
@@ -4,6 +4,7 @@
# Client security module.
import hashlib
+import ssl
_isSha1Enabled = True
_isSha256Enabled = True
@@ -25,6 +26,17 @@ def SetSha512Enabled(state):
_isSha512Enabled = state
+"""
+Verify that a thumbprint matches a certificate
+
+:param derCert: DER-encoded SSL certificate
+:type derCert: str
+:param thumbprint: SHA1/SHA256/SHA512 thumbprint
+ of an SSL certificate
+:type thumbprint: str
+:returns: None
+:raises ThumbprintMismatchException
+"""
def VerifyCertThumbprint(derCert, thumbprint):
thumbprint_len = len(thumbprint)
if thumbprint_len == 40 and _isSha1Enabled:
@@ -49,3 +61,29 @@ class ThumbprintMismatchException(Exception):
self.expected = expected
self.actual = actual
+
+
+"""
+Verify that two PEM certificates match
+
+:param actualCert: PEM-encoded SSL certificate
+:type actualCert: str
+:param expectedCert: PEM-encoded SSL certificate
+:type actualCert: str
+:returns: None
+:raises CertificateMismatchException
+"""
+def VerifyCert(actualCert, expectedCert):
+ actualCert = actualCert.strip()
+ expectedCert = expectedCert.strip()
+ if actualCert != expectedCert:
+ raise CertificateMismatchException(expectedCert, actualCert)
+
+
+class CertificateMismatchException(Exception):
+ def __init__(self, expected, actual):
+ Exception.__init__(self, "Certificate mismatch. Expected: \n{0}, "
+ "actual: \n{1}".format(expected, actual))
+
+ self.expected = expected
+ self.actual = actual
diff --git a/pyVmomi/SoapAdapter.py b/pyVmomi/SoapAdapter.py
index 64835d5..22c81f6 100644
--- a/pyVmomi/SoapAdapter.py
+++ b/pyVmomi/SoapAdapter.py
@@ -34,7 +34,7 @@ from .VmomiSupport import (
GetWsdlName, GetWsdlNamespace, GetWsdlType, GuessWsdlMethod, GuessWsdlType,
IsChildVersion, ManagedMethod, UnknownManagedMethod, ManagedObject,
Object, PropertyPath, Type, binary, versionIdMap, versionMap)
-from .Security import VerifyCertThumbprint
+from .Security import VerifyCert, VerifyCertThumbprint
from .Version import kind
from . import version_info_str
@@ -1054,18 +1054,19 @@ class SoapStubAdapterBase(StubAdapterBase):
# Subclass of HTTPConnection that connects over a Unix domain socket
-# instead of a TCP port. The path of the socket is passed in place of
-# the hostname. Fairly gross but does the job.
-class UnixSocketConnection(HTTPConnection):
- # The HTTPConnection ctor expects a single argument, which it interprets
- # as the host to connect to; for UnixSocketConnection, we instead interpret
+# instead of a TCP port. The path of the socket is passed in place of
+# the hostname. Fairly gross but does the job.
+class _UnixSocketConnection(HTTPConnection):
+ # The HTTPConnection constructor expects a single argument, which it interprets
+ # as the host to connect to; for _UnixSocketConnection, we instead interpret
# the parameter as the filesystem path of the Unix domain socket.
- def __init__(self, path):
+ def __init__(self, host, **kwargs):
# Pass '' as the host to HTTPConnection; it doesn't really matter
# what we pass (since we've overridden the connect method) as long
# as it's a valid string.
- HTTPConnection.__init__(self, '')
- self.path = path
+ # kwargs allows to pass all other HTTPConnection constructor arguments
+ self.path = host
+ HTTPConnection.__init__(self, '', **kwargs)
def connect(self):
# Hijack the connect method of HTTPConnection to connect to the
@@ -1076,76 +1077,90 @@ class UnixSocketConnection(HTTPConnection):
self.sock = sock
-def _VerifyThumbprint(thumbprint, connection):
- """If there is a thumbprint, connect to the server and verify that the
- SSL certificate matches the given thumbprint. An exception is thrown
- if there is a mismatch.
+def _VerifyPinnedIdentity(connection, certificate, thumbprint):
"""
- if thumbprint and isinstance(connection, HTTPSConnection):
- if not connection.sock:
- connection.connect()
- derCert = connection.sock.getpeercert(True)
+ Verify that the server connection SSL certificate
+ matches the given certificate or thumbprint.
+
+ :param connection: Server connection
+ :type connection: HTTPSConnection
+ :param certificate: PEM-encoded SSL certificate of the server
+ :type certificate: str
+ :param thumbprint: SHA1/SHA256/SHA512 thumbprint
+ of an SSL certificate
+ :type thumbprint: str
+ :returns: None
+ :raises ThumbprintMismatchException or CertificateMismatchException
+ """
+ derCert = connection.sock.getpeercert(True)
+ if certificate:
+ pemCert = ssl.DER_cert_to_PEM_cert(derCert)
+ VerifyCert(pemCert, certificate)
+ elif thumbprint:
VerifyCertThumbprint(derCert, thumbprint)
-# Stand-in for the HTTPSConnection class that will connect to a regular HTTP
-# proxy.
-class HTTPProxyConnection(object):
- # @param proxyPath The path to pass to the CONNECT command.
- # @param customHeaders Dictionary with custom HTTP headers.
- def __init__(self, proxyPath, customHeaders=None):
- self.proxyPath = proxyPath
- self.customHeaders = customHeaders if customHeaders else {}
- # Connects to an HTTP proxy server and initiates a tunnel to the destination
- # specified by self.proxyPath.
- #
- # @param addr Address in the form of host:port
- # @param port If no port number is passed,
- # the port is extracted from the addr string
- # @param timeout Connection timeout in seconds
- # @param context SSL Context with the desired SSL options
- # @return HTTPSConnection to the destination
- def __call__(self, addr, port, timeout, context):
- conn = HTTPSConnection(host=addr, port=port,
- timeout=timeout, context=context)
- conn.set_tunnel(self.proxyPath, headers=self.customHeaders)
- return conn
+def _Connect(connection, serverPemCert=None, thumbprint=None):
+ """
+ Connect to the server specified when the connection object was created.
+
+ serverPemCert and thumbprint denote a pre-defined pinned
+ certificate/thumbprint which has been trusted by the user.
+ Whenever provided if that certificate/thumbprint of the peer exactly
+ matches the pinned certificate/thumbprint, then the connection is established.
+
+ :param connection: Server connection
+ :type connection: HTTPConnection
+ :param serverPemCert: PEM-encoded SSL certificate of the server
+ :type serverPemCert: str
+ :param thumbprint: SHA1/SHA256/SHA512 thumbprint
+ of an SSL certificate
+ :type thumbprint: str
+ :returns: HTTPConnection
+ """
+ try:
+ connection.connect()
+ except ssl.SSLCertVerificationError as ex:
+ if serverPemCert or thumbprint:
+ connection._context.check_hostname = False
+ connection._context.verify_mode = ssl.CERT_NONE
+ connection.connect()
+ _VerifyPinnedIdentity(connection, serverPemCert, thumbprint)
+ else:
+ raise ex
+ return connection
-# Stand-in for the HTTPSConnection class that will connect to a proxy and
-# issue a CONNECT command to start an SSL tunnel.
-class SSLTunnelConnection(HTTPProxyConnection):
- # Connects to a proxy server and initiates a tunnel to the destination
- # specified by self.proxyPath.
- # For Python Version < 2.7.9. cert_reqs=CERT_OPTIONAL to verify
- # server certificate
- #
- # @param addr Address in the form of host:port
- # @param port If no port number is passed,
- # the port is extracted from the addr string
- # @param timeout Connection timeout in seconds
- # @param context SSL Context with the desired SSL options
- # @return HTTPSConnection to the destination
- def __call__(self, addr, port=None, timeout=None, context=None):
- tunnelConn = HTTPConnection(host=addr, port=port, timeout=timeout)
- tunnelConn.request('CONNECT', self.proxyPath)
+# A subclass of HTTPConnection that uses SSL through an HTTP proxy tunnel
+class _SSLTunnelConnection(HTTPSConnection):
+
+ def connect(self):
+ tunnelConn = HTTPConnection(host=self.host,
+ port=self.port,
+ timeout=self.timeout)
+ tunnelConn.request('CONNECT', self._proxyPath)
resp = tunnelConn.getresponse()
if resp.status != 200:
raise HTTPException(
"{0} {1}".format(resp.status, resp.reason))
- conn = HTTPSConnection(host=tunnelConn.host,
- port=tunnelConn.port,
- context=context,
- timeout=timeout)
- if conn.host in ('localhost', '127.0.0.1', '::1'):
- conn._context.check_hostname = False
- conn._context.verify_mode = ssl.CERT_NONE
+ if self.host in ('localhost', '127.0.0.1', '::1'):
+ self._context.check_hostname = False
+ self._context.verify_mode = ssl.CERT_NONE
- conn.sock = conn._context.wrap_socket(sock=tunnelConn.sock,
+ self.sock = self._context.wrap_socket(sock=tunnelConn.sock,
server_hostname=tunnelConn.host)
- return conn
+
+ def setVcTunnel(self, proxyPath):
+ """
+ Set the path to use when tunneling through VC's reverse proxy
+ ex: /sdkTunnel
+
+ :param proxyPath: Tunnel path
+ :type proxyPath: str
+ """
+ self._proxyPath = proxyPath
class GzipReader:
@@ -1244,10 +1259,21 @@ class SoapStubAdapter(SoapStubAdapterBase):
# @param httpProxyHost The host name of the proxy server.
# @param httpProxyPort The proxy server port.
# @param sslProxyPath Path to use when tunneling through VC's reverse proxy
- # @param thumbprint The SHA1/SHA256/SHA512 thumbprint of the server's
+ # @param thumbprint **** Deprecated. Use serverPemCert instead.
+ # If both fields are set, thumbprint should match
+ # serverPemCert.
+ # The SHA1/SHA256/SHA512 thumbprint of the server's
# SSL certificate.
- # Some use a thumbprint of the form xx:xx:xx..:xx. We ignore the ":"
- # characters. If set to None, any thumbprint is accepted.
+ # Whenever provided if that thumbprint of the peer's
+ # certificate exactly matches the pinned thumbprint
+ # the connection is established.
+ # Some use a thumbprint of the form xx:xx:xx..:xx.
+ # We ignore the ":" characters.
+ # @param serverPemCert PEM-encoded SSL certificate of the
+ # host to which we are connecting.
+ # Whenever provided if that certificate of the peer
+ # exactly matches the pinned certificate
+ # the connection is established.
# @param cacertsFile **** Deprecated. Please load cert to context and pass
# context instread ****
# sslContext.load_verify_locations(cafile=ca_cert_file)
@@ -1278,6 +1304,7 @@ class SoapStubAdapter(SoapStubAdapterBase):
httpProxyPort=80,
sslProxyPath=None,
thumbprint=None,
+ serverPemCert=None,
cacertsFile=None,
version=None,
acceptCompressedResponses=True,
@@ -1297,12 +1324,16 @@ class SoapStubAdapter(SoapStubAdapterBase):
version = 'vim.version.version9'
SoapStubAdapterBase.__init__(self, version=version, sessionId=sessionId)
if sock:
- self.scheme = UnixSocketConnection
+ self.scheme = _UnixSocketConnection
# Store sock in the host member variable because that's where
- # the UnixSocketConnection ctor expects to find it -- see above
+ # the _UnixSocketConnection ctor expects to find it -- see above
self.host = sock
elif url:
- url_scheme_specifier, self.host, urlpath = urlparse(url)[:3]
+ parse_result = urlparse(url)
+ url_scheme_specifier = parse_result.scheme
+ self.host = parse_result.netloc
+ port = parse_result.port
+ urlpath = parse_result.path
# Only use the URL path if it's sensible, otherwise use the path
# keyword argument as passed in.
if urlpath not in ('', '/'):
@@ -1317,8 +1348,10 @@ class SoapStubAdapter(SoapStubAdapterBase):
if host.find(':') != -1 and host[0] != '[': # is IPv6?
host = '[' + host + ']'
self.host = '{0}:{1}'.format(host, port)
+ self.port = port
self.path = path
+ self.serverPemCert = serverPemCert
if thumbprint:
self.thumbprint = thumbprint.replace(":", "").lower()
if len(self.thumbprint) not in (40, 64, 128):
@@ -1329,18 +1362,17 @@ class SoapStubAdapter(SoapStubAdapterBase):
self.is_tunnel = False
if sslProxyPath:
- self.scheme = SSLTunnelConnection(sslProxyPath, customHeaders)
+ self.sslProxyPath = sslProxyPath
+ self.scheme = _SSLTunnelConnection
self.is_tunnel = True
elif httpProxyHost:
- self.scheme = HTTPProxyConnection(self.host, customHeaders)
- self.is_tunnel = True
-
# Is httpProxyHost IPv6
if httpProxyHost.find(':') != -1 and httpProxyHost[0] != '[':
- httpProxyHost = '[' + httpProxyHost + ']'
-
- # Swap the actual host with the proxy.
- self.host = "{0}:{1}".format(httpProxyHost, httpProxyPort)
+ self.httpProxyHost = '[' + httpProxyHost + ']'
+ else:
+ self.httpProxyHost = httpProxyHost
+ self.httpProxyPort = httpProxyPort
+ self.is_tunnel = True
self.poolSize = poolSize
self.pool = []
self.connectionPoolTimeout = connectionPoolTimeout
@@ -1499,15 +1531,31 @@ class SoapStubAdapter(SoapStubAdapterBase):
self.lock.acquire()
self._CloseIdleConnections()
if self.pool:
- result, _ = self.pool.pop(0)
+ conn, _ = self.pool.pop(0)
self.lock.release()
else:
self.lock.release()
- result = self.scheme(self.host, **self.schemeArgs)
- _VerifyThumbprint(self.thumbprint, result)
+ # Python fails if both host:port pair
+ # and port are used for HTTPConnection
+ host = getattr(self, 'httpProxyHost', self.host.rsplit(":", 1)[0])
+ port = getattr(self, 'httpProxyPort', self.port)
+
+ # Fix for gh-100985 which is fixed
+ # in Python 3.11.9 and Python 3.12.4
+ if host and host[0] == '[' and host[-1] == ']':
+ host = host[1:-1]
+
+ conn = self.scheme(host=host, port=port, **self.schemeArgs)
+ if self.is_tunnel:
+ if hasattr(self, 'sslProxyPath'):
+ conn.setVcTunnel(self.sslProxyPath)
+ elif hasattr(self, 'httpProxyHost'):
+ customHeaders = self._customHeaders if self._customHeaders else {}
+ conn.set_tunnel(host, port, customHeaders)
+ _Connect(connection=conn, serverPemCert=self.serverPemCert, thumbprint=self.thumbprint)
- return result
+ return conn
# Drop all cached connections to the server.
def DropConnections(self):
--
2.46.0