File fix-multiple-security-issues-bsc-1197417.patch of Package salt
From c56be5236f29416ecfbd746ee8d25b886e7a8893 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
<psuarezhernandez@suse.com>
Date: Wed, 23 Mar 2022 16:16:49 +0000
Subject: [PATCH] Fix multiple security issues (bsc#1197417)
* Sign authentication replies to prevent MiTM (CVE-2020-22935)
* Sign pillar data to prevent MiTM attacks. (CVE-2022-22934)
* Prevent job and fileserver replays (CVE-2022-22936)
* Fixed targeting bug, especially visible when using syndic and user auth. (CVE-2022-22941)
---
requirements/static/linux.in | 5 +-
salt/crypt.py | 308 +++---
salt/master.py | 65 +-
salt/pillar/__init__.py | 5 +-
salt/transport/mixins/auth.py | 323 +++---
salt/transport/tcp.py | 173 ++-
salt/transport/zeromq.py | 157 ++-
salt/utils/minions.py | 25 +-
tests/pytests/unit/test_crypt.py | 149 +++
tests/pytests/unit/transport/test_zeromq.py | 1042 +++++++++++++++++++
tests/unit/transport/mixins.py | 12 +-
tests/unit/transport/test_tcp.py | 91 +-
tests/unit/transport/test_zeromq.py | 124 ++-
13 files changed, 2008 insertions(+), 471 deletions(-)
create mode 100644 tests/pytests/unit/transport/test_zeromq.py
diff --git a/requirements/static/linux.in b/requirements/static/linux.in
index c20b006e2b..bed8943683 100644
--- a/requirements/static/linux.in
+++ b/requirements/static/linux.in
@@ -1,5 +1,6 @@
apache-libcloud==2.0.0
boto3
+azure==4.0.0; sys_platform != "win32"
boto>=2.46.0
certifi
cffi
@@ -20,8 +21,8 @@ kubernetes<4.0
libnacl==1.6.0
mock>=3.0.5
more-itertools==5.0.0
-moto
-paramiko>=2.1.6
+moto; python_version >= '3.6'
+paramiko>=2.1.6; python_version >= '3.6'
psutil
pygit2
pyinotify
diff --git a/salt/crypt.py b/salt/crypt.py
index be426c6ab3..d025d807be 100644
--- a/salt/crypt.py
+++ b/salt/crypt.py
@@ -20,6 +20,7 @@ import logging
import stat
import traceback
import binascii
+import uuid
import weakref
import getpass
import salt.ext.tornado.gen
@@ -255,7 +256,11 @@ def verify_signature(pubkey_path, message, signature):
md = EVP.MessageDigest('sha1')
md.update(salt.utils.stringutils.to_bytes(message))
digest = md.final()
- return pubkey.verify(digest, signature)
+ try:
+ return pubkey.verify(digest, signature)
+ except RSA.RSAError as exc:
+ log.debug("Signature verification failed: %s", exc.args[0])
+ return False
else:
verifier = PKCS1_v1_5.new(pubkey)
return verifier.verify(SHA.new(salt.utils.stringutils.to_bytes(message)), signature)
@@ -655,10 +660,20 @@ class AsyncAuth(object):
self._authenticate_future.set_exception(error)
else:
key = self.__key(self.opts)
- AsyncAuth.creds_map[key] = creds
- self._creds = creds
- self._crypticle = Crypticle(self.opts, creds['aes'])
- self._authenticate_future.set_result(True) # mark the sign-in as complete
+ if key not in AsyncAuth.creds_map:
+ log.debug("%s Got new master aes key.", self)
+ AsyncAuth.creds_map[key] = creds
+ self._creds = creds
+ self._crypticle = Crypticle(self.opts, creds["aes"])
+ elif self._creds["aes"] != creds["aes"]:
+ log.debug("%s The master's aes key has changed.", self)
+ AsyncAuth.creds_map[key] = creds
+ self._creds = creds
+ self._crypticle = Crypticle(self.opts, creds["aes"])
+
+ self._authenticate_future.set_result(
+ True
+ ) # mark the sign-in as complete
# Notify the bus about creds change
if self.opts.get('auth_events') is True:
with salt.utils.event.get_event(self.opts.get('__role'), opts=self.opts, listen=False) as event:
@@ -681,7 +696,6 @@ class AsyncAuth(object):
with the publication port and the shared AES key.
'''
- auth = {}
auth_timeout = self.opts.get('auth_timeout', None)
if auth_timeout is not None:
@@ -693,10 +707,6 @@ class AsyncAuth(object):
if auth_tries is not None:
tries = auth_tries
- m_pub_fn = os.path.join(self.opts['pki_dir'], self.mpub)
-
- auth['master_uri'] = self.opts['master_uri']
-
close_channel = False
if not channel:
close_channel = True
@@ -722,55 +732,88 @@ class AsyncAuth(object):
finally:
if close_channel:
channel.close()
+ ret = self.handle_signin_response(sign_in_payload, payload)
+ raise salt.ext.tornado.gen.Return(ret)
- if not isinstance(payload, dict):
- log.error('Sign-in attempt failed: %s', payload)
- raise salt.ext.tornado.gen.Return(False)
- if 'load' in payload:
- if 'ret' in payload['load']:
- if not payload['load']['ret']:
- if self.opts['rejected_retry']:
- log.error(
- 'The Salt Master has rejected this minion\'s public '
- 'key.\nTo repair this issue, delete the public key '
- 'for this minion on the Salt Master.\nThe Salt '
- 'Minion will attempt to to re-authenicate.'
- )
- raise salt.ext.tornado.gen.Return('retry')
- else:
- log.critical(
- 'The Salt Master has rejected this minion\'s public '
- 'key!\nTo repair this issue, delete the public key '
- 'for this minion on the Salt Master and restart this '
- 'minion.\nOr restart the Salt Master in open mode to '
- 'clean out the keys. The Salt Minion will now exit.'
- )
- # Add a random sleep here for systems that are using a
- # a service manager to immediately restart the service
- # to avoid overloading the system
- time.sleep(random.randint(10, 20))
- sys.exit(salt.defaults.exitcodes.EX_NOPERM)
- # has the master returned that its maxed out with minions?
- elif payload['load']['ret'] == 'full':
- raise salt.ext.tornado.gen.Return('full')
- else:
+ def handle_signin_response(self, sign_in_payload, payload):
+ auth = {}
+ m_pub_fn = os.path.join(self.opts["pki_dir"], self.mpub)
+ auth["master_uri"] = self.opts["master_uri"]
+ if not isinstance(payload, dict) or "load" not in payload:
+ log.error("Sign-in attempt failed: %s", payload)
+ return False
+
+ clear_signed_data = payload["load"]
+ clear_signature = payload["sig"]
+ payload = self.serial.loads(clear_signed_data)
+
+ if "pub_key" in payload:
+ auth["aes"] = self.verify_master(
+ payload, master_pub="token" in sign_in_payload
+ )
+ if not auth["aes"]:
+ log.critical(
+ "The Salt Master server's public key did not authenticate!\n"
+ "The master may need to be updated if it is a version of Salt "
+ "lower than %s, or\n"
+ "If you are confident that you are connecting to a valid Salt "
+ "Master, then remove the master public key and restart the "
+ "Salt Minion.\nThe master public key can be found "
+ "at:\n%s",
+ salt.version.__version__,
+ m_pub_fn,
+ )
+ raise SaltClientError("Invalid master key")
+
+ master_pubkey_path = os.path.join(self.opts["pki_dir"], self.mpub)
+ if os.path.exists(master_pubkey_path) and not verify_signature(
+ master_pubkey_path, clear_signed_data, clear_signature
+ ):
+ log.critical("The payload signature did not validate.")
+ raise SaltClientError("Invalid signature")
+
+ if payload["nonce"] != sign_in_payload["nonce"]:
+ log.critical("The payload nonce did not validate.")
+ raise SaltClientError("Invalid nonce")
+
+ if "ret" in payload:
+ if not payload["ret"]:
+ if self.opts["rejected_retry"]:
log.error(
- 'The Salt Master has cached the public key for this '
- 'node, this salt minion will wait for %s seconds '
- 'before attempting to re-authenticate',
- self.opts['acceptance_wait_time']
+ "The Salt Master has rejected this minion's public "
+ "key.\nTo repair this issue, delete the public key "
+ "for this minion on the Salt Master.\nThe Salt "
+ "Minion will attempt to re-authenicate."
)
- raise salt.ext.tornado.gen.Return('retry')
- auth['aes'] = self.verify_master(payload, master_pub='token' in sign_in_payload)
- if not auth['aes']:
- log.critical(
- 'The Salt Master server\'s public key did not authenticate!\n'
- 'The master may need to be updated if it is a version of Salt '
- 'lower than %s, or\n'
- 'If you are confident that you are connecting to a valid Salt '
- 'Master, then remove the master public key and restart the '
- 'Salt Minion.\nThe master public key can be found '
- 'at:\n%s', salt.version.__version__, m_pub_fn
+ return "retry"
+ else:
+ log.critical(
+ "The Salt Master has rejected this minion's public "
+ "key!\nTo repair this issue, delete the public key "
+ "for this minion on the Salt Master and restart this "
+ "minion.\nOr restart the Salt Master in open mode to "
+ "clean out the keys. The Salt Minion will now exit."
+ )
+ # Add a random sleep here for systems that are using a
+ # a service manager to immediately restart the service
+ # to avoid overloading the system
+ time.sleep(random.randint(10, 20))
+ sys.exit(salt.defaults.exitcodes.EX_NOPERM)
+ # has the master returned that its maxed out with minions?
+ elif payload["ret"] == "full":
+ return "full"
+ else:
+ log.error(
+ "The Salt Master has cached the public key for this "
+ "node, this salt minion will wait for %s seconds "
+ "before attempting to re-authenticate",
+ self.opts["acceptance_wait_time"],
+ )
+ return "retry"
+
+ if self.opts.get("syndic_master", False): # Is syndic
+ syndic_finger = self.opts.get(
+ "syndic_finger", self.opts.get("master_finger", False)
)
raise SaltClientError('Invalid master key')
if self.opts.get('syndic_master', False): # Is syndic
@@ -779,11 +822,17 @@ class AsyncAuth(object):
if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != syndic_finger:
self._finger_fail(syndic_finger, m_pub_fn)
else:
- if self.opts.get('master_finger', False):
- if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != self.opts['master_finger']:
- self._finger_fail(self.opts['master_finger'], m_pub_fn)
- auth['publish_port'] = payload['publish_port']
- raise salt.ext.tornado.gen.Return(auth)
+ if self.opts.get("master_finger", False):
+ if (
+ salt.utils.crypt.pem_finger(
+ m_pub_fn, sum_type=self.opts["hash_type"]
+ )
+ != self.opts["master_finger"]
+ ):
+ self._finger_fail(self.opts["master_finger"], m_pub_fn)
+
+ auth["publish_port"] = payload["publish_port"]
+ return auth
def get_keys(self):
'''
@@ -827,9 +876,10 @@ class AsyncAuth(object):
:rtype: dict
'''
payload = {}
- payload['cmd'] = '_auth'
- payload['id'] = self.opts['id']
- if 'autosign_grains' in self.opts:
+ payload["cmd"] = "_auth"
+ payload["id"] = self.opts["id"]
+ payload["nonce"] = uuid.uuid4().hex
+ if "autosign_grains" in self.opts:
autosign_grains = {}
for grain in self.opts['autosign_grains']:
autosign_grains[grain] = self.opts['grains'].get(grain, None)
@@ -1191,13 +1241,15 @@ class SAuth(AsyncAuth):
self.token = Crypticle.generate_key_string()
else:
self.token = salt.utils.stringutils.to_bytes(Crypticle.generate_key_string())
+
self.serial = salt.payload.Serial(self.opts)
- self.pub_path = os.path.join(self.opts['pki_dir'], 'minion.pub')
- self.rsa_path = os.path.join(self.opts['pki_dir'], 'minion.pem')
- if 'syndic_master' in self.opts:
- self.mpub = 'syndic_master.pub'
- elif 'alert_master' in self.opts:
- self.mpub = 'monitor_master.pub'
+ self.pub_path = os.path.join(self.opts["pki_dir"], "minion.pub")
+ self.rsa_path = os.path.join(self.opts["pki_dir"], "minion.pem")
+ self._creds = None
+ if "syndic_master" in self.opts:
+ self.mpub = "syndic_master.pub"
+ elif "alert_master" in self.opts:
+ self.mpub = "monitor_master.pub"
else:
self.mpub = 'minion_master.pub'
if not os.path.isfile(self.pub_path):
@@ -1253,8 +1305,14 @@ class SAuth(AsyncAuth):
log.debug('Authentication wait time is %s', acceptance_wait_time)
continue
break
- self._creds = creds
- self._crypticle = Crypticle(self.opts, creds['aes'])
+ if self._creds is None:
+ log.error("%s Got new master aes key.", self)
+ self._creds = creds
+ self._crypticle = Crypticle(self.opts, creds["aes"])
+ elif self._creds["aes"] != creds["aes"]:
+ log.error("%s The master's aes key has changed.", self)
+ self._creds = creds
+ self._crypticle = Crypticle(self.opts, creds["aes"])
def sign_in(self, timeout=60, safe=True, tries=1, channel=None):
'''
@@ -1309,63 +1367,7 @@ class SAuth(AsyncAuth):
if close_channel:
channel.close()
- if 'load' in payload:
- if 'ret' in payload['load']:
- if not payload['load']['ret']:
- if self.opts['rejected_retry']:
- log.error(
- 'The Salt Master has rejected this minion\'s public '
- 'key.\nTo repair this issue, delete the public key '
- 'for this minion on the Salt Master.\nThe Salt '
- 'Minion will attempt to to re-authenicate.'
- )
- return 'retry'
- else:
- log.critical(
- 'The Salt Master has rejected this minion\'s public '
- 'key!\nTo repair this issue, delete the public key '
- 'for this minion on the Salt Master and restart this '
- 'minion.\nOr restart the Salt Master in open mode to '
- 'clean out the keys. The Salt Minion will now exit.'
- )
- sys.exit(salt.defaults.exitcodes.EX_NOPERM)
- # has the master returned that its maxed out with minions?
- elif payload['load']['ret'] == 'full':
- return 'full'
- else:
- log.error(
- 'The Salt Master has cached the public key for this '
- 'node. If this is the first time connecting to this '
- 'master then this key may need to be accepted using '
- '\'salt-key -a %s\' on the salt master. This salt '
- 'minion will wait for %s seconds before attempting '
- 'to re-authenticate.',
- self.opts['id'], self.opts['acceptance_wait_time']
- )
- return 'retry'
- auth['aes'] = self.verify_master(payload, master_pub='token' in sign_in_payload)
- if not auth['aes']:
- log.critical(
- 'The Salt Master server\'s public key did not authenticate!\n'
- 'The master may need to be updated if it is a version of Salt '
- 'lower than %s, or\n'
- 'If you are confident that you are connecting to a valid Salt '
- 'Master, then remove the master public key and restart the '
- 'Salt Minion.\nThe master public key can be found '
- 'at:\n%s', salt.version.__version__, m_pub_fn
- )
- sys.exit(42)
- if self.opts.get('syndic_master', False): # Is syndic
- syndic_finger = self.opts.get('syndic_finger', self.opts.get('master_finger', False))
- if syndic_finger:
- if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != syndic_finger:
- self._finger_fail(syndic_finger, m_pub_fn)
- else:
- if self.opts.get('master_finger', False):
- if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != self.opts['master_finger']:
- self._finger_fail(self.opts['master_finger'], m_pub_fn)
- auth['publish_port'] = payload['publish_port']
- return auth
+ return self.handle_signin_response(sign_in_payload, payload)
class Crypticle(object):
@@ -1380,11 +1382,11 @@ class Crypticle(object):
AES_BLOCK_SIZE = 16
SIG_SIZE = hashlib.sha256().digest_size
- def __init__(self, opts, key_string, key_size=192):
+ def __init__(self, opts, key_string, key_size=192, serial=0):
self.key_string = key_string
self.keys = self.extract_keys(self.key_string, key_size)
self.key_size = key_size
- self.serial = salt.payload.Serial(opts)
+ self.serial = serial
@classmethod
def generate_key_string(cls, key_size=192):
@@ -1464,19 +1466,45 @@ class Crypticle(object):
else:
return data[:-data[-1]]
- def dumps(self, obj):
- '''
+ def dumps(self, obj, nonce=None):
+ """
Serialize and encrypt a python object
- '''
- return self.encrypt(self.PICKLE_PAD + self.serial.dumps(obj))
+ """
+ if nonce:
+ toencrypt = (
+ self.PICKLE_PAD + nonce.encode() + salt.payload.Serial({}).dumps(obj)
+ )
+ else:
+ toencrypt = self.PICKLE_PAD + salt.payload.Serial({}).dumps(obj)
+ return self.encrypt(toencrypt)
- def loads(self, data, raw=False):
- '''
+ def loads(self, data, raw=False, nonce=None):
+ """
Decrypt and un-serialize a python object
- '''
+ """
data = self.decrypt(data)
# simple integrity check to verify that we got meaningful data
if not data.startswith(self.PICKLE_PAD):
return {}
- load = self.serial.loads(data[len(self.PICKLE_PAD):], raw=raw)
- return load
+ data = data[len(self.PICKLE_PAD) :]
+ if nonce:
+ ret_nonce = data[:32].decode()
+ data = data[32:]
+ if ret_nonce != nonce:
+ raise SaltClientError("Nonce verification error")
+ payload = salt.payload.Serial({}).loads(data, raw=raw)
+ if isinstance(payload, dict):
+ if "serial" in payload:
+ serial = payload.pop("serial")
+ if serial <= self.serial:
+ log.critical(
+ "A message with an invalid serial was received.\n"
+ "this serial: %d\n"
+ "last serial: %d\n"
+ "The minion will not honor this request.",
+ serial,
+ self.serial,
+ )
+ return {}
+ self.serial = serial
+ return payload
diff --git a/salt/master.py b/salt/master.py
index 8bccc00036..dda920b511 100644
--- a/salt/master.py
+++ b/salt/master.py
@@ -138,6 +138,51 @@ class SMaster(object):
'''
return salt.daemons.masterapi.access_keys(self.opts)
+ @classmethod
+ def get_serial(cls, opts=None, event=None, lock=True):
+ if lock:
+ with cls.secrets["aes"]["secret"].get_lock():
+ if cls.secrets["aes"]["serial"].value == sys.maxsize:
+ cls.rotate_secrets(opts, event, use_lock=False)
+ else:
+ cls.secrets["aes"]["serial"].value += 1
+ return cls.secrets["aes"]["serial"].value
+ else:
+ if cls.secrets["aes"]["serial"].value == sys.maxsize:
+ cls.rotate_secrets(opts, event, use_lock=False)
+ else:
+ cls.secrets["aes"]["serial"].value += 1
+ return cls.secrets["aes"]["serial"].value
+
+ @classmethod
+ def rotate_secrets(cls, opts=None, event=None, use_lock=True):
+ log.info("Rotating master AES key")
+ if opts is None:
+ opts = {}
+
+ for secret_key, secret_map in cls.secrets.items():
+ # should be unnecessary-- since no one else should be modifying
+ if use_lock:
+ with secret_map["secret"].get_lock():
+ secret_map["secret"].value = salt.utils.stringutils.to_bytes(
+ secret_map["reload"]()
+ )
+ if "serial" in secret_map:
+ secret_map["serial"].value = 0
+ else:
+ secret_map["secret"].value = salt.utils.stringutils.to_bytes(
+ secret_map["reload"]()
+ )
+ if "serial" in secret_map:
+ secret_map["serial"].value = 0
+ if event:
+ event.fire_event({"rotate_{}_key".format(secret_key): True}, tag="key")
+
+ if opts.get("ping_on_rotate"):
+ # Ping all minions to get them to pick up the new key
+ log.debug("Pinging all connected minions due to key rotation")
+ salt.utils.master.ping_all_connected_minions(opts)
+
class Maintenance(salt.utils.process.SignalHandlingProcess):
'''
@@ -298,18 +343,8 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
to_rotate = True
if to_rotate:
- log.info('Rotating master AES key')
- for secret_key, secret_map in six.iteritems(SMaster.secrets):
- # should be unnecessary-- since no one else should be modifying
- with secret_map['secret'].get_lock():
- secret_map['secret'].value = salt.utils.stringutils.to_bytes(secret_map['reload']())
- self.event.fire_event({'rotate_{0}_key'.format(secret_key): True}, tag='key')
+ SMaster.rotate_secrets(self.opts, self.event)
self.rotate = now
- if self.opts.get('ping_on_rotate'):
- # Ping all minions to get them to pick up the new key
- log.debug('Pinging all connected minions '
- 'due to key rotation')
- salt.utils.master.ping_all_connected_minions(self.opts)
def handle_git_pillar(self):
'''
@@ -671,9 +706,13 @@ class Master(SMaster):
salt.crypt.Crypticle.generate_key_string()
)
),
- 'reload': salt.crypt.Crypticle.generate_key_string
+ "serial": multiprocessing.Value(
+ ctypes.c_longlong, lock=False # We'll use the lock from 'secret'
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
}
- log.info('Creating master process manager')
+
+ log.info("Creating master process manager")
# Since there are children having their own ProcessManager we should wait for kill more time.
self.process_manager = salt.utils.process.ProcessManager(wait_for_kill=5)
pub_channels = []
diff --git a/salt/pillar/__init__.py b/salt/pillar/__init__.py
index 6b36cda708..81a0417d0f 100644
--- a/salt/pillar/__init__.py
+++ b/salt/pillar/__init__.py
@@ -13,7 +13,7 @@ import logging
import salt.ext.tornado.gen
import sys
import traceback
-import inspect
+import uuid
# Import salt libs
import salt.loader
@@ -178,6 +178,9 @@ class AsyncRemotePillar(RemotePillarMixin):
load,
dictkey='pillar',
)
+ except salt.crypt.AuthenticationError as exc:
+ log.error(exc.message)
+ raise SaltClientError("Exception getting pillar.")
except Exception: # pylint: disable=broad-except
log.exception('Exception getting pillar:')
raise SaltClientError('Exception getting pillar.')
diff --git a/salt/transport/mixins/auth.py b/salt/transport/mixins/auth.py
index ec736227ab..cfc1ef36a8 100644
--- a/salt/transport/mixins/auth.py
+++ b/salt/transport/mixins/auth.py
@@ -107,10 +107,10 @@ class AESReqServerMixin(object):
self.master_key = salt.crypt.MasterKeys(self.opts)
- def _encrypt_private(self, ret, dictkey, target):
- '''
+ def _encrypt_private(self, ret, dictkey, target, nonce=None, sign_messages=True):
+ """
The server equivalent of ReqChannel.crypted_transfer_decode_dictentry
- '''
+ """
# encrypt with a specific AES key
pubfn = os.path.join(self.opts['pki_dir'],
'minions',
@@ -124,9 +124,8 @@ class AESReqServerMixin(object):
except (ValueError, IndexError, TypeError):
return self.crypticle.dumps({})
except IOError:
- log.error('AES key not found')
- return {'error': 'AES key not found'}
-
+ log.error("AES key not found")
+ return {"error": "AES key not found"}
pret = {}
if not six.PY2:
key = salt.utils.stringutils.to_bytes(key)
@@ -134,12 +133,34 @@ class AESReqServerMixin(object):
pret['key'] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(pub)
- pret['key'] = cipher.encrypt(key)
- pret[dictkey] = pcrypt.dumps(
- ret if ret is not False else {}
- )
+ pret["key"] = cipher.encrypt(key)
+ if ret is False:
+ ret = {}
+ if sign_messages:
+ if nonce is None:
+ return {"error": "Nonce not included in request"}
+ tosign = salt.payload.Serial({}).dumps(
+ {"key": pret["key"], "pillar": ret, "nonce": nonce}
+ )
+ master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
+ signed_msg = {
+ "data": tosign,
+ "sig": salt.crypt.sign_message(master_pem_path, tosign),
+ }
+ pret[dictkey] = pcrypt.dumps(signed_msg)
+ else:
+ pret[dictkey] = pcrypt.dumps(ret)
return pret
+ def _clear_signed(self, load):
+ master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
+ tosign = salt.payload.Serial({}).dumps(load)
+ return {
+ "enc": "clear",
+ "load": tosign,
+ "sig": salt.crypt.sign_message(master_pem_path, tosign),
+ }
+
def _update_aes(self):
'''
Check to see if a fresh AES key is available and update the components
@@ -161,8 +182,8 @@ class AESReqServerMixin(object):
payload['load'] = self.crypticle.loads(payload['load'])
return payload
- def _auth(self, load):
- '''
+ def _auth(self, load, sign_messages=False):
+ """
Authenticate the client, use the sent public key to encrypt the AES key
which was generated at start up.
@@ -175,13 +196,15 @@ class AESReqServerMixin(object):
# Make an RSA key with the pub key
# Encrypt the AES key as an encrypted salt.payload
# Package the return and return it
- '''
+ """
- if not salt.utils.verify.valid_id(self.opts, load['id']):
- log.info('Authentication request from invalid id %s', load['id'])
- return {'enc': 'clear',
- 'load': {'ret': False}}
- log.info('Authentication request from %s', load['id'])
+ if not salt.utils.verify.valid_id(self.opts, load["id"]):
+ log.info("Authentication request from invalid id %s", load["id"])
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
+ log.info("Authentication request from %s", load["id"])
# 0 is default which should be 'unlimited'
if self.opts['max_minions'] > 0:
@@ -209,10 +232,16 @@ class AESReqServerMixin(object):
'id': load['id'],
'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': 'full'}}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(
+ eload, salt.utils.event.tagify(prefix="auth")
+ )
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": "full", "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": "full"}}
# Check if key is configured to be auto-rejected/signed
auto_reject = self.auto_key.check_autoreject(load['id'])
@@ -236,16 +265,17 @@ class AESReqServerMixin(object):
pass
elif os.path.isfile(pubfn_rejected):
# The key has been rejected, don't place it in pending
- log.info('Public key rejected for %s. Key is present in '
- 'rejection key dir.', load['id'])
- eload = {'result': False,
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
-
+ log.info(
+ "Public key rejected for %s. Key is present in " "rejection key dir.",
+ load["id"],
+ )
+ eload = {"result": False, "id": load["id"], "pub": load["pub"]}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
elif os.path.isfile(pubfn):
# The key has been accepted, check it
with salt.utils.files.fopen(pubfn, 'r') as pubfn_handle:
@@ -256,29 +286,37 @@ class AESReqServerMixin(object):
'the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
- with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
- fp_.write(load['pub'])
- eload = {'result': False,
- 'id': load['id'],
- 'act': 'denied',
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ with salt.utils.files.fopen(pubfn_denied, "w+") as fp_:
+ fp_.write(load["pub"])
+ eload = {
+ "result": False,
+ "id": load["id"],
+ "act": "denied",
+ "pub": load["pub"],
+ }
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(
+ eload, salt.utils.event.tagify(prefix="auth")
+ )
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": False, "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
elif not os.path.isfile(pubfn_pend):
# The key has not been accepted, this is a new minion
if os.path.isdir(pubfn_pend):
# The key path is a directory, error out
- log.info('New public key %s is a directory', load['id'])
- eload = {'result': False,
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ log.info("New public key %s is a directory", load["id"])
+ eload = {"result": False, "id": load["id"], "pub": load["pub"]}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
if auto_reject:
key_path = pubfn_rejected
@@ -297,17 +335,22 @@ class AESReqServerMixin(object):
if key_path is not None:
# Write the key to the appropriate location
- with salt.utils.files.fopen(key_path, 'w+') as fp_:
- fp_.write(load['pub'])
- ret = {'enc': 'clear',
- 'load': {'ret': key_result}}
- eload = {'result': key_result,
- 'act': key_act,
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return ret
+ with salt.utils.files.fopen(key_path, "w+") as fp_:
+ fp_.write(load["pub"])
+ eload = {
+ "result": key_result,
+ "act": key_act,
+ "id": load["id"],
+ "pub": load["pub"],
+ }
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": key_result, "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": key_result}}
elif os.path.isfile(pubfn_pend):
# This key is in the pending dir and is awaiting acceptance
@@ -319,17 +362,22 @@ class AESReqServerMixin(object):
shutil.move(pubfn_pend, pubfn_rejected)
except (IOError, OSError):
pass
- log.info('Pending public key for %s rejected via '
- 'autoreject_file', load['id'])
- ret = {'enc': 'clear',
- 'load': {'ret': False}}
- eload = {'result': False,
- 'act': 'reject',
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return ret
+ log.info(
+ "Pending public key for %s rejected via " "autoreject_file",
+ load["id"],
+ )
+ eload = {
+ "result": False,
+ "act": "reject",
+ "id": load["id"],
+ "pub": load["pub"],
+ }
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
elif not auto_sign:
# This key is in the pending dir and is not being auto-signed.
@@ -344,30 +392,46 @@ class AESReqServerMixin(object):
'attempt to compromise the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
- with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
- fp_.write(load['pub'])
- eload = {'result': False,
- 'id': load['id'],
- 'act': 'denied',
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ with salt.utils.files.fopen(pubfn_denied, "w+") as fp_:
+ fp_.write(load["pub"])
+ eload = {
+ "result": False,
+ "id": load["id"],
+ "act": "denied",
+ "pub": load["pub"],
+ }
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(
+ eload, salt.utils.event.tagify(prefix="auth")
+ )
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": False, "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
else:
log.info(
'Authentication failed from host %s, the key is in '
'pending and needs to be accepted with salt-key '
'-a %s', load['id'], load['id']
)
- eload = {'result': True,
- 'act': 'pend',
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': True}}
+ eload = {
+ "result": True,
+ "act": "pend",
+ "id": load["id"],
+ "pub": load["pub"],
+ }
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(
+ eload, salt.utils.event.tagify(prefix="auth")
+ )
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": True, "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": True}}
else:
# This key is in pending and has been configured to be
# auto-signed. Check to see if it is the same key, and if
@@ -381,28 +445,32 @@ class AESReqServerMixin(object):
'attempt to compromise the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
- with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
- fp_.write(load['pub'])
- eload = {'result': False,
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ with salt.utils.files.fopen(pubfn_denied, "w+") as fp_:
+ fp_.write(load["pub"])
+ eload = {"result": False, "id": load["id"], "pub": load["pub"]}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(
+ eload, salt.utils.event.tagify(prefix="auth")
+ )
+ if sign_messages:
+ return self._clear_signed(
+ {"ret": False, "nonce": load["nonce"]}
+ )
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
else:
os.remove(pubfn_pend)
else:
# Something happened that I have not accounted for, FAIL!
- log.warning('Unaccounted for authentication failure')
- eload = {'result': False,
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ log.warning("Unaccounted for authentication failure")
+ eload = {"result": False, "id": load["id"], "pub": load["pub"]}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
log.info('Authentication accepted from %s', load['id'])
# only write to disk if you are adding the file, and in open mode,
@@ -415,14 +483,16 @@ class AESReqServerMixin(object):
if os.path.isfile(pubfn):
with salt.utils.files.fopen(pubfn, 'r') as fp_:
disk_key = fp_.read()
- if load['pub'] and load['pub'] != disk_key:
- log.debug('Host key change detected in open mode.')
- with salt.utils.files.fopen(pubfn, 'w+') as fp_:
- fp_.write(load['pub'])
- elif not load['pub']:
- log.error('Public key is empty: {0}'.format(load['id']))
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ if load["pub"] and load["pub"] != disk_key:
+ log.debug("Host key change detected in open mode.")
+ with salt.utils.files.fopen(pubfn, "w+") as fp_:
+ fp_.write(load["pub"])
+ elif not load["pub"]:
+ log.error("Public key is empty: %s", load["id"])
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
pub = None
@@ -436,8 +506,10 @@ class AESReqServerMixin(object):
pub = salt.crypt.get_rsa_pub_key(pubfn)
except salt.crypt.InvalidKeyError as err:
log.error('Corrupt public key "%s": %s', pubfn, err)
- return {'enc': 'clear',
- 'load': {'ret': False}}
+ if sign_messages:
+ return self._clear_signed({"ret": False, "nonce": load["nonce"]})
+ else:
+ return {"enc": "clear", "load": {"ret": False}}
if not HAS_M2:
cipher = PKCS1_OAEP.new(pub)
@@ -507,14 +579,15 @@ class AESReqServerMixin(object):
ret['aes'] = pub.public_encrypt(aes,
RSA.pkcs1_oaep_padding)
else:
- ret['aes'] = cipher.encrypt(aes)
+ ret["aes"] = cipher.encrypt(aes)
+
# Be aggressive about the signature
digest = salt.utils.stringutils.to_bytes(hashlib.sha256(aes).hexdigest())
- ret['sig'] = salt.crypt.private_encrypt(self.master_key.key, digest)
- eload = {'result': True,
- 'act': 'accept',
- 'id': load['id'],
- 'pub': load['pub']}
- if self.opts.get('auth_events') is True:
- self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
+ ret["sig"] = salt.crypt.private_encrypt(self.master_key.key, digest)
+ eload = {"result": True, "act": "accept", "id": load["id"], "pub": load["pub"]}
+ if self.opts.get("auth_events") is True:
+ self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth"))
+ if sign_messages:
+ ret["nonce"] = load["nonce"]
+ return self._clear_signed(ret)
return ret
diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py
index 12ef24e86f..9b2f6641ab 100644
--- a/salt/transport/tcp.py
+++ b/salt/transport/tcp.py
@@ -16,6 +16,7 @@ import sys
import time
import threading
import traceback
+import uuid
import weakref
# Import Salt Libs
@@ -339,12 +340,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
def _package_load(self, load):
return {
- 'enc': self.crypt,
- 'load': load,
+ "enc": self.crypt,
+ "load": load,
+ "version": 2,
}
@salt.ext.tornado.gen.coroutine
- def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
+ def crypted_transfer_decode_dictentry(
+ self, load, dictkey=None, tries=3, timeout=60
+ ):
+ nonce = uuid.uuid4().hex
+ load["nonce"] = nonce
if not self.auth.authenticated:
yield self.auth.authenticate()
ret = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout)
@@ -353,12 +359,30 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
aes = key.private_decrypt(ret['key'], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
- aes = cipher.decrypt(ret['key'])
+ aes = cipher.decrypt(ret["key"])
+
+ # Decrypt using the public key.
pcrypt = salt.crypt.Crypticle(self.opts, aes)
- data = pcrypt.loads(ret[dictkey])
- if six.PY3:
- data = salt.transport.frame.decode_embedded_strs(data)
- raise salt.ext.tornado.gen.Return(data)
+ signed_msg = pcrypt.loads(ret[dictkey])
+
+ # Validate the master's signature.
+ master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub")
+ if not salt.crypt.verify_signature(
+ master_pubkey_path, signed_msg["data"], signed_msg["sig"]
+ ):
+ raise salt.crypt.AuthenticationError(
+ "Pillar payload signature failed to validate."
+ )
+
+ # Make sure the signed key matches the key we used to decrypt the data.
+ data = salt.payload.Serial({}).loads(signed_msg["data"])
+ if data["key"] != ret["key"]:
+ raise salt.crypt.AuthenticationError("Key verification failed.")
+
+ # Validate the nonce.
+ if data["nonce"] != nonce:
+ raise salt.crypt.AuthenticationError("Pillar nonce verification failed.")
+ raise salt.ext.tornado.gen.Return(data["pillar"])
@salt.ext.tornado.gen.coroutine
def _crypted_transfer(self, load, tries=3, timeout=60):
@@ -368,6 +392,10 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
Indeed, we can fail too early in case of a master restart during a
minion state execution call
'''
+ nonce = uuid.uuid4().hex
+ if load and isinstance(load, dict):
+ load["nonce"] = nonce
+
@salt.ext.tornado.gen.coroutine
def _do_transfer():
data = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)),
@@ -378,9 +406,8 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
# communication, we do not subscribe to return events, we just
# upload the results to the master
if data:
- data = self.auth.crypticle.loads(data)
- if six.PY3:
- data = salt.transport.frame.decode_embedded_strs(data)
+ data = self.auth.crypticle.loads(data, nonce=nonce)
+ data = salt.transport.frame.decode_embedded_strs(data)
raise salt.ext.tornado.gen.Return(data)
if not self.auth.authenticated:
@@ -448,8 +475,9 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
def _package_load(self, load):
return {
- 'enc': self.crypt,
- 'load': load,
+ "enc": self.crypt,
+ "load": load,
+ "version": 2,
}
@salt.ext.tornado.gen.coroutine
@@ -719,13 +747,31 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
stream.send(self.serial.dumps('bad load: id {0} is not a string'.format(id_)))
raise salt.ext.tornado.gen.Return()
+ version = 0
+ if "version" in payload:
+ version = payload["version"]
+
+ sign_messages = False
+ if version > 1:
+ sign_messages = True
+
# intercept the "_auth" commands, since the main daemon shouldn't know
# anything about our key auth
- if payload['enc'] == 'clear' and payload.get('load', {}).get('cmd') == '_auth':
- yield stream.write(salt.transport.frame.frame_msg(
- self._auth(payload['load']), header=header))
+ if (
+ payload["enc"] == "clear"
+ and payload.get("load", {}).get("cmd") == "_auth"
+ ):
+ yield stream.write(
+ salt.transport.frame.frame_msg(
+ self._auth(payload["load"], sign_messages), header=header
+ )
+ )
raise salt.ext.tornado.gen.Return()
+ nonce = None
+ if version > 1:
+ nonce = payload["load"].pop("nonce", None)
+
# TODO: test
try:
ret, req_opts = yield self.payload_handler(payload)
@@ -739,13 +785,21 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
req_fun = req_opts.get('fun', 'send')
if req_fun == 'send_clear':
stream.write(salt.transport.frame.frame_msg(ret, header=header))
- elif req_fun == 'send':
- stream.write(salt.transport.frame.frame_msg(self.crypticle.dumps(ret), header=header))
- elif req_fun == 'send_private':
- stream.write(salt.transport.frame.frame_msg(self._encrypt_private(ret,
- req_opts['key'],
- req_opts['tgt'],
- ), header=header))
+ elif req_fun == "send":
+ stream.write(
+ salt.transport.frame.frame_msg(
+ self.crypticle.dumps(ret, nonce), header=header
+ )
+ )
+ elif req_fun == "send_private":
+ stream.write(
+ salt.transport.frame.frame_msg(
+ self._encrypt_private(
+ ret, req_opts["key"], req_opts["tgt"], nonce, sign_messages,
+ ),
+ header=header,
+ )
+ )
else:
log.error('Unknown req_fun %s', req_fun)
# always attempt to return an error to the minion
@@ -1274,8 +1328,9 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object):
'''
TCP publisher
'''
- def __init__(self, opts, io_loop=None):
- super(PubServer, self).__init__(ssl_options=opts.get('ssl'))
+
+ def __init__(self, opts, io_loop=None, pack_publish=lambda _: _):
+ super().__init__(ssl_options=opts.get("ssl"))
self.io_loop = io_loop
self.opts = opts
self._closing = False
@@ -1300,6 +1355,10 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object):
opts=self.opts,
listen=False
)
+ self._pack_publish = pack_publish
+
+ def pack_publish(self, payload):
+ return self._pack_publish(payload)
def close(self):
if self._closing:
@@ -1406,8 +1465,9 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object):
# TODO: ACK the publish through IPC
@salt.ext.tornado.gen.coroutine
def publish_payload(self, package, _):
- log.debug('TCP PubServer sending payload: %s', package)
- payload = salt.transport.frame.frame_msg(package['payload'])
+ log.debug("TCP PubServer sending payload: %s", package)
+ package = self.pack_publish(package)
+ payload = salt.transport.frame.frame_msg(package["payload"])
to_remove = []
if 'topic_lst' in package:
@@ -1482,7 +1542,9 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
# Spin up the publisher
- pub_server = PubServer(self.opts, io_loop=self.io_loop)
+ pub_server = PubServer(
+ self.opts, io_loop=self.io_loop, pack_publish=self.pack_publish
+ )
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(sock, self.opts)
@@ -1523,30 +1585,20 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
'''
process_manager.add_process(self._publish_daemon, kwargs=kwargs)
- def publish(self, load):
- '''
+ def pack_publish(self, load):
+ """
Publish "load" to minions
- '''
- payload = {'enc': 'aes'}
-
- crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
- payload['load'] = crypticle.dumps(load)
- if self.opts['sign_pub_messages']:
- master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
- log.debug("Signing data packet")
- payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
- # Use the Salt IPC server
- if self.opts.get('ipc_mode', '') == 'tcp':
- pull_uri = int(self.opts.get('tcp_master_publish_pull', 4514))
- else:
- pull_uri = os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
- # TODO: switch to the actual asynchronous interface
- #pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop)
- pub_sock = salt.utils.asynchronous.SyncWrapper(
- salt.transport.ipc.IPCMessageClient,
- (pull_uri,)
+ """
+ payload = {"enc": "aes"}
+ load["serial"] = salt.master.SMaster.get_serial()
+ crypticle = salt.crypt.Crypticle(
+ self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
)
- pub_sock.connect()
+ payload["load"] = crypticle.dumps(load)
+ if self.opts["sign_pub_messages"]:
+ master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
+ log.debug("Signing data packet")
+ payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"])
int_payload = {'payload': self.serial.dumps(payload)}
@@ -1562,6 +1614,23 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
# Send list of miions thru so zmq can target them
int_payload['topic_lst'] = match_ids
else:
- int_payload['topic_lst'] = load['tgt']
+ int_payload["topic_lst"] = load["tgt"]
+ return int_payload
+
+ def publish(self, load):
+ """
+ Publish "load" to minions
+ """
# Send it over IPC!
- pub_sock.send(int_payload)
+ # Use the Salt IPC server
+ # TODO: switch to the actual asynchronous interface
+ # pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop)
+ if self.opts.get("ipc_mode", "") == "tcp":
+ pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
+ else:
+ pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
+ pub_sock = salt.utils.asynchronous.SyncWrapper(
+ salt.transport.ipc.IPCMessageClient, (pull_uri,), loop_kwarg="io_loop",
+ )
+ pub_sock.connect()
+ pub_sock.send(load)
diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py
index a5844e9132..540abb8853 100644
--- a/salt/transport/zeromq.py
+++ b/salt/transport/zeromq.py
@@ -13,6 +13,8 @@ import signal
import socket
import hashlib
import logging
+import signal
+import uuid
import weakref
import threading
from random import randint
@@ -21,6 +23,7 @@ from random import randint
import salt.auth
import salt.crypt
import salt.log.setup
+import salt.utils.crypt
import salt.utils.event
import salt.utils.files
import salt.utils.minions
@@ -65,6 +68,7 @@ except ImportError:
except ImportError:
from Crypto.Cipher import PKCS1_OAEP
+
log = logging.getLogger(__name__)
@@ -79,11 +83,12 @@ def _get_master_uri(master_ip,
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
'''
- from salt.utils.zeromq import ip_bracket
- master_uri = 'tcp://{master_ip}:{master_port}'.format(
- master_ip=ip_bracket(master_ip), master_port=master_port)
+ from salt.utils.zeromq import ip_bracket
+ master_uri = "tcp://{master_ip}:{master_port}".format(
+ master_ip=ip_bracket(master_ip), master_port=master_port
+ )
if source_ip or source_port:
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
@@ -292,23 +297,30 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
def _package_load(self, load):
return {
- 'enc': self.crypt,
- 'load': load,
+ "enc": self.crypt,
+ "load": load,
+ "version": 2,
}
@salt.ext.tornado.gen.coroutine
- def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
+ def crypted_transfer_decode_dictentry(
+ self, load, dictkey=None, tries=3, timeout=60
+ ):
+ nonce = uuid.uuid4().hex
+ load["nonce"] = nonce
if not self.auth.authenticated:
# Return control back to the caller, continue when authentication succeeds
yield self.auth.authenticate()
- # Return control to the caller. When send() completes, resume by populating ret with the Future.result
+
+ # Return control to the caller. When send() completes, resume by
+ # populating ret with the Future.result
ret = yield self.message_client.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
- key = self.auth.get_keys()
- if 'key' not in ret:
+
+ if "key" not in ret:
# Reauth in the case our key is deleted on the master side.
yield self.auth.authenticate()
ret = yield self.message_client.send(
@@ -316,17 +328,37 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
timeout=timeout,
tries=tries,
)
+
+ key = self.auth.get_keys()
if HAS_M2:
aes = key.private_decrypt(ret['key'],
RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
- aes = cipher.decrypt(ret['key'])
+ aes = cipher.decrypt(ret["key"])
+
+ # Decrypt using the public key.
pcrypt = salt.crypt.Crypticle(self.opts, aes)
- data = pcrypt.loads(ret[dictkey])
- if six.PY3:
- data = salt.transport.frame.decode_embedded_strs(data)
- raise salt.ext.tornado.gen.Return(data)
+ signed_msg = pcrypt.loads(ret[dictkey])
+
+ # Validate the master's signature.
+ master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub")
+ if not salt.crypt.verify_signature(
+ master_pubkey_path, signed_msg["data"], signed_msg["sig"]
+ ):
+ raise salt.crypt.AuthenticationError(
+ "Pillar payload signature failed to validate."
+ )
+
+ # Make sure the signed key matches the key we used to decrypt the data.
+ data = salt.payload.Serial({}).loads(signed_msg["data"])
+ if data["key"] != ret["key"]:
+ raise salt.crypt.AuthenticationError("Key verification failed.")
+
+ # Validate the nonce.
+ if data["nonce"] != nonce:
+ raise salt.crypt.AuthenticationError("Pillar nonce verification failed.")
+ raise salt.ext.tornado.gen.Return(data["pillar"])
@salt.ext.tornado.gen.coroutine
def _crypted_transfer(self, load, tries=3, timeout=60, raw=False):
@@ -343,6 +375,10 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
:param int tries: The number of times to make before failure
:param int timeout: The number of seconds on a response before failing
'''
+ nonce = uuid.uuid4().hex
+ if load and isinstance(load, dict):
+ load["nonce"] = nonce
+
@salt.ext.tornado.gen.coroutine
def _do_transfer():
# Yield control to the caller. When send() completes, resume by populating data with the Future.result
@@ -356,7 +392,7 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# communication, we do not subscribe to return events, we just
# upload the results to the master
if data:
- data = self.auth.crypticle.loads(data, raw)
+ data = self.auth.crypticle.loads(data, raw, nonce)
if six.PY3 and not raw:
data = salt.transport.frame.decode_embedded_strs(data)
raise salt.ext.tornado.gen.Return(data)
@@ -774,12 +810,24 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,
stream.send(self.serial.dumps('bad load: id {0} is not a string'.format(id_)))
raise salt.ext.tornado.gen.Return()
+ version = 0
+ if "version" in payload:
+ version = payload["version"]
+
+ sign_messages = False
+ if version > 1:
+ sign_messages = True
+
# intercept the "_auth" commands, since the main daemon shouldn't know
# anything about our key auth
- if payload['enc'] == 'clear' and payload.get('load', {}).get('cmd') == '_auth':
- stream.send(self.serial.dumps(self._auth(payload['load'])))
+ if payload["enc"] == "clear" and payload.get("load", {}).get("cmd") == "_auth":
+ stream.send(self.serial.dumps(self._auth(payload["load"], sign_messages)))
raise salt.ext.tornado.gen.Return()
+ nonce = None
+ if version > 1:
+ nonce = payload["load"].pop("nonce", None)
+
# TODO: test
try:
# Take the payload_handler function that was registered when we created the channel
@@ -794,13 +842,16 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,
req_fun = req_opts.get('fun', 'send')
if req_fun == 'send_clear':
stream.send(self.serial.dumps(ret))
- elif req_fun == 'send':
- stream.send(self.serial.dumps(self.crypticle.dumps(ret)))
- elif req_fun == 'send_private':
- stream.send(self.serial.dumps(self._encrypt_private(ret,
- req_opts['key'],
- req_opts['tgt'],
- )))
+ elif req_fun == "send":
+ stream.send(self.serial.dumps(self.crypticle.dumps(ret, nonce)))
+ elif req_fun == "send_private":
+ stream.send(
+ self.serial.dumps(
+ self._encrypt_private(
+ ret, req_opts["key"], req_opts["tgt"], nonce, sign_messages,
+ )
+ )
+ )
else:
log.error('Unknown req_fun %s', req_fun)
# always attempt to return an error to the minion
@@ -878,6 +929,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
if log_queue:
salt.log.setup.set_multiprocessing_logging_queue(log_queue)
salt.log.setup.setup_multiprocessing_logging(log_queue)
+ salt.utils.crypt.reinit_crypto()
# Set up the context
context = zmq.Context(1)
@@ -929,7 +981,9 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
try:
log.debug('Publish daemon getting data from puller %s', pull_uri)
package = pull_sock.recv()
- log.debug('Publish daemon received payload. size=%d', len(package))
+ log.debug("Publish daemon received payload. size=%d", len(package))
+ load = salt.payload.Serial({}).loads(package)
+ package = self.pack_publish(load)
unpacked_package = salt.payload.unpackage(package)
if six.PY3:
@@ -1012,17 +1066,20 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
self.pub_close()
ctx = zmq.Context.instance()
self._sock_data.sock = ctx.socket(zmq.PUSH)
- self.pub_sock.setsockopt(zmq.LINGER, -1)
- if self.opts.get('ipc_mode', '') == 'tcp':
- pull_uri = 'tcp://127.0.0.1:{0}'.format(
- self.opts.get('tcp_master_publish_pull', 4514)
- )
+ self._sock_data.sock.setsockopt(zmq.LINGER, -1)
+ self._sock_data.sock.setsockopt(zmq.SNDHWM, self.opts.get("pub_hwm", 1000))
+ self._sock_data.sock.setsockopt(zmq.RCVHWM, self.opts.get("pub_hwm", 1000))
+ self._sock_data.sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000))
+ if self.opts.get("ipc_mode", "") == "tcp":
+ pull_uri = "tcp://127.0.0.1:{}".format(
+ self.opts.get("tcp_master_publish_pull", 4514)
+ )
else:
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
log.debug("Connecting to pub server: %s", pull_uri)
- self.pub_sock.connect(pull_uri)
+ self._sock_data.sock.connect(pull_uri)
return self._sock_data.sock
def pub_close(self):
@@ -1032,20 +1089,23 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
if hasattr(self._sock_data, 'sock'):
self._sock_data.sock.close()
- delattr(self._sock_data, 'sock')
+ self._sock_data.sock = None
- def publish(self, load):
- '''
- Publish "load" to minions. This send the load to the publisher daemon
- process with does the actual sending to minions.
+ def pack_publish(self, load):
+ """
+ Package the "load" for a publish to minions. This send the load to the
+ publisher daemon process with does the actual sending to minions.
:param dict load: A load to be sent across the wire to minions
- '''
- payload = {'enc': 'aes'}
- crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
- payload['load'] = crypticle.dumps(load)
- if self.opts['sign_pub_messages']:
- master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
+ """
+ payload = {"enc": "aes"}
+ load["serial"] = salt.master.SMaster.get_serial()
+ crypticle = salt.crypt.Crypticle(
+ self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
+ )
+ payload["load"] = crypticle.dumps(load)
+ if self.opts["sign_pub_messages"]:
+ master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
int_payload = {'payload': self.serial.dumps(payload)}
@@ -1070,10 +1130,19 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'Sending payload to publish daemon. jid=%s size=%d',
load.get('jid', None), len(payload),
)
+ return payload
+
+ def publish(self, load):
+ """
+ Publish "load" to minions. This send the load to the publisher daemon
+ process with does the actual sending to minions.
+
+ :param dict load: A load to be sent across the wire to minions
+ """
if not self.pub_sock:
self.pub_connect()
- self.pub_sock.send(payload)
- log.debug('Sent payload to publish daemon.')
+ self.pub_sock.send(self.serial.dumps(load))
+ log.debug("Sent payload to publish daemon.")
class AsyncReqMessageClientPool(salt.transport.MessageClientPool):
diff --git a/salt/utils/minions.py b/salt/utils/minions.py
index b02ab34777..6995a29131 100644
--- a/salt/utils/minions.py
+++ b/salt/utils/minions.py
@@ -742,21 +742,28 @@ class CkMinions(object):
return _res
def validate_tgt(self, valid, expr, tgt_type, minions=None, expr_form=None):
- '''
- Return a Bool. This function returns if the expression sent in is
- within the scope of the valid expression
- '''
+ """
+ Validate the target minions against the possible valid minions.
+
+ If ``minions`` is provided, they will be compared against the valid
+ minions. Otherwise, ``expr`` and ``tgt_type`` will be used to expand
+ to a list of target minions.
- v_minions = set(self.check_minions(valid, 'compound').get('minions', []))
+ Return True if all of the requested minions are valid minions,
+ otherwise return False.
+ """
+
+ v_minions = set(self.check_minions(valid, "compound").get("minions", []))
+ if not v_minions:
+ # There are no valid minions, so it doesn't matter what we are
+ # targeting - this is a fail.
+ return False
if minions is None:
_res = self.check_minions(expr, tgt_type)
minions = set(_res['minions'])
else:
minions = set(minions)
- d_bool = not bool(minions.difference(v_minions))
- if len(v_minions) == len(minions) and d_bool:
- return True
- return d_bool
+ return minions.issubset(v_minions)
def match_check(self, regex, fun):
'''
diff --git a/tests/pytests/unit/test_crypt.py b/tests/pytests/unit/test_crypt.py
index aa8f439b8c..6ffd912166 100644
--- a/tests/pytests/unit/test_crypt.py
+++ b/tests/pytests/unit/test_crypt.py
@@ -4,10 +4,159 @@ tests.pytests.unit.test_crypt
Unit tests for salt's crypt module
"""
+import uuid
+
import pytest
import salt.crypt
+import salt.master
import salt.utils.files
+PRIV_KEY = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAoAsMPt+4kuIG6vKyw9r3+OuZrVBee/2vDdVetW+Js5dTlgrJ
+aghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLnyHNJ/HpVhMG0M07MF6FMfILtDrrt8
+ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+fu6HYwu96HggmG2pqkOrn3iGfqBvV
+YVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpef8vRUrNicRLc7dAcvfhtgt2DXEZ2
+d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvTIIPQIjR8htFxGTz02STVXfnhnJ0Z
+k8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cYOwIDAQABAoIBABZUJEO7Y91+UnfC
+H6XKrZEZkcnH7j6/UIaOD9YhdyVKxhsnax1zh1S9vceNIgv5NltzIsfV6vrb6v2K
+Dx/F7Z0O0zR5o+MlO8ZncjoNKskex10gBEWG00Uqz/WPlddiQ/TSMJTv3uCBAzp+
+S2Zjdb4wYPUlgzSgb2ygxrhsRahMcSMG9PoX6klxMXFKMD1JxiY8QfAHahPzQXy9
+F7COZ0fCVo6BE+MqNuQ8tZeIxu8mOULQCCkLFwXmkz1FpfK/kNRmhIyhxwvCS+z4
+JuErW3uXfE64RLERiLp1bSxlDdpvRO2R41HAoNELTsKXJOEt4JANRHm/CeyA5wsh
+NpscufUCgYEAxhgPfcMDy2v3nL6KtkgYjdcOyRvsAF50QRbEa8ldO+87IoMDD/Oe
+osFERJ5hhyyEO78QnaLVegnykiw5DWEF02RKMhD/4XU+1UYVhY0wJjKQIBadsufB
+2dnaKjvwzUhPh5BrBqNHl/FXwNCRDiYqXa79eWCPC9OFbZcUWWq70s8CgYEAztOI
+61zRfmXJ7f70GgYbHg+GA7IrsAcsGRITsFR82Ho0lqdFFCxz7oK8QfL6bwMCGKyk
+nzk+twh6hhj5UNp18KN8wktlo02zTgzgemHwaLa2cd6xKgmAyuPiTgcgnzt5LVNG
+FOjIWkLwSlpkDTl7ZzY2QSy7t+mq5d750fpIrtUCgYBWXZUbcpPL88WgDB7z/Bjg
+dlvW6JqLSqMK4b8/cyp4AARbNp12LfQC55o5BIhm48y/M70tzRmfvIiKnEc/gwaE
+NJx4mZrGFFURrR2i/Xx5mt/lbZbRsmN89JM+iKWjCpzJ8PgIi9Wh9DIbOZOUhKVB
+9RJEAgo70LvCnPTdS0CaVwKBgDJW3BllAvw/rBFIH4OB/vGnF5gosmdqp3oGo1Ik
+jipmPAx6895AH4tquIVYrUl9svHsezjhxvjnkGK5C115foEuWXw0u60uiTiy+6Pt
+2IS0C93VNMulenpnUrppE7CN2iWFAiaura0CY9fE/lsVpYpucHAWgi32Kok+ZxGL
+WEttAoGAN9Ehsz4LeQxEj3x8wVeEMHF6OsznpwYsI2oVh6VxpS4AjgKYqeLVcnNi
+TlZFsuQcqgod8OgzA91tdB+Rp86NygmWD5WzeKXpCOg9uA+y/YL+0sgZZHsuvbK6
+PllUgXdYxqClk/hdBFB7v9AQoaj7K9Ga22v32msftYDQRJ94xOI=
+-----END RSA PRIVATE KEY-----
+"""
+
+
+PUB_KEY = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoAsMPt+4kuIG6vKyw9r3
++OuZrVBee/2vDdVetW+Js5dTlgrJaghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLny
+HNJ/HpVhMG0M07MF6FMfILtDrrt8ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+f
+u6HYwu96HggmG2pqkOrn3iGfqBvVYVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpe
+f8vRUrNicRLc7dAcvfhtgt2DXEZ2d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvT
+IIPQIjR8htFxGTz02STVXfnhnJ0Zk8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cY
+OwIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+PRIV_KEY2 = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAp+8cTxguO6Vg+YO92VfHgNld3Zy8aM3JbZvpJcjTnis+YFJ7
+Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvTsMBZWvmUoEVUj1Xg8XXQkBvb9Ozy
+Gqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc2cKeCVvWFqDi0GRFGzyaXLaX3PPm
+M7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbuT1OqDfufXWQl/82JXeiwU2cOpqWq
+7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww3oJSwvMbAmgzvOhqqhlqv+K7u0u7
+FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQbQIDAQABAoIBAADrqWDQnd5DVZEA
+lR+WINiWuHJAy/KaIC7K4kAMBgbxrz2ZbiY9Ok/zBk5fcnxIZDVtXd1sZicmPlro
+GuWodIxdPZAnWpZ3UtOXUayZK/vCP1YsH1agmEqXuKsCu6Fc+K8VzReOHxLUkmXn
+FYM+tixGahXcjEOi/aNNTWitEB6OemRM1UeLJFzRcfyXiqzHpHCIZwBpTUAsmzcG
+QiVDkMTKubwo/m+PVXburX2CGibUydctgbrYIc7EJvyx/cpRiPZXo1PhHQWdu4Y1
+SOaC66WLsP/wqvtHo58JQ6EN/gjSsbAgGGVkZ1xMo66nR+pLpR27coS7o03xCks6
+DY/0mukCgYEAuLIGgBnqoh7YsOBLd/Bc1UTfDMxJhNseo+hZemtkSXz2Jn51322F
+Zw/FVN4ArXgluH+XsOhvG/MFFpojwZSrb0Qq5b1MRdo9qycq8lGqNtlN1WHqosDQ
+zW29kpL0tlRrSDpww3wRESsN9rH5XIrJ1b3ZXuO7asR+KBVQMy/+NcUCgYEA6MSC
+c+fywltKPgmPl5j0DPoDe5SXE/6JQy7w/vVGrGfWGf/zEJmhzS2R+CcfTTEqaT0T
+Yw8+XbFgKAqsxwtE9MUXLTVLI3sSUyE4g7blCYscOqhZ8ItCUKDXWkSpt++rG0Um
+1+cEJP/0oCazG6MWqvBC4NpQ1nzh46QpjWqMwokCgYAKDLXJ1p8rvx3vUeUJW6zR
+dfPlEGCXuAyMwqHLxXgpf4EtSwhC5gSyPOtx2LqUtcrnpRmt6JfTH4ARYMW9TMef
+QEhNQ+WYj213mKP/l235mg1gJPnNbUxvQR9lkFV8bk+AGJ32JRQQqRUTbU+yN2MQ
+HEptnVqfTp3GtJIultfwOQKBgG+RyYmu8wBP650izg33BXu21raEeYne5oIqXN+I
+R5DZ0JjzwtkBGroTDrVoYyuH1nFNEh7YLqeQHqvyufBKKYo9cid8NQDTu+vWr5UK
+tGvHnwdKrJmM1oN5JOAiq0r7+QMAOWchVy449VNSWWV03aeftB685iR5BXkstbIQ
+EVopAoGAfcGBTAhmceK/4Q83H/FXBWy0PAa1kZGg/q8+Z0KY76AqyxOVl0/CU/rB
+3tO3sKhaMTHPME/MiQjQQGoaK1JgPY6JHYvly2KomrJ8QTugqNGyMzdVJkXAK2AM
+GAwC8ivAkHf8CHrHa1W7l8t2IqBjW1aRt7mOW92nfG88Hck0Mbo=
+-----END RSA PRIVATE KEY-----
+"""
+
+
+PUB_KEY2 = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp+8cTxguO6Vg+YO92VfH
+gNld3Zy8aM3JbZvpJcjTnis+YFJ7Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvT
+sMBZWvmUoEVUj1Xg8XXQkBvb9OzyGqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc
+2cKeCVvWFqDi0GRFGzyaXLaX3PPmM7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbu
+T1OqDfufXWQl/82JXeiwU2cOpqWq7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww
+3oJSwvMbAmgzvOhqqhlqv+K7u0u7FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQ
+bQIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+
+def test_cryptical_dumps_no_nonce():
+ master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string())
+ data = {"foo": "bar"}
+ ret = master_crypt.dumps(data)
+
+ # Validate message structure
+ assert isinstance(ret, bytes)
+ une = master_crypt.decrypt(ret)
+ une.startswith(master_crypt.PICKLE_PAD)
+ assert salt.payload.Serial({}).loads(une[len(master_crypt.PICKLE_PAD) :]) == data
+
+ # Validate load back to orig data
+ assert master_crypt.loads(ret) == data
+
+
+def test_cryptical_dumps_valid_nonce():
+ nonce = uuid.uuid4().hex
+ master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string())
+ data = {"foo": "bar"}
+ ret = master_crypt.dumps(data, nonce=nonce)
+
+ assert isinstance(ret, bytes)
+ une = master_crypt.decrypt(ret)
+ une.startswith(master_crypt.PICKLE_PAD)
+ nonce_and_data = une[len(master_crypt.PICKLE_PAD) :]
+ assert nonce_and_data.startswith(nonce.encode())
+ assert salt.payload.Serial({}).loads(nonce_and_data[len(nonce) :]) == data
+
+ assert master_crypt.loads(ret, nonce=nonce) == data
+
+
+def test_cryptical_dumps_invalid_nonce():
+ nonce = uuid.uuid4().hex
+ master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string())
+ data = {"foo": "bar"}
+ ret = master_crypt.dumps(data, nonce=nonce)
+ assert isinstance(ret, bytes)
+ with pytest.raises(salt.crypt.SaltClientError, match="Nonce verification error"):
+ assert master_crypt.loads(ret, nonce="abcde")
+
+
+def test_verify_signature(tmpdir):
+ tmpdir.join("foo.pem").write(PRIV_KEY.strip())
+ tmpdir.join("foo.pub").write(PUB_KEY.strip())
+ tmpdir.join("bar.pem").write(PRIV_KEY2.strip())
+ tmpdir.join("bar.pub").write(PUB_KEY2.strip())
+ msg = b"foo bar"
+ sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg)
+ assert salt.crypt.verify_signature(str(tmpdir.join("foo.pub")), msg, sig)
+
+
+def test_verify_signature_bad_sig(tmpdir):
+ tmpdir.join("foo.pem").write(PRIV_KEY.strip())
+ tmpdir.join("foo.pub").write(PUB_KEY.strip())
+ tmpdir.join("bar.pem").write(PRIV_KEY2.strip())
+ tmpdir.join("bar.pub").write(PUB_KEY2.strip())
+ msg = b"foo bar"
+ sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg)
+ assert not salt.crypt.verify_signature(str(tmpdir.join("bar.pub")), msg, sig)
def test_get_rsa_pub_key_bad_key(tmp_path):
"""
diff --git a/tests/pytests/unit/transport/test_zeromq.py b/tests/pytests/unit/transport/test_zeromq.py
new file mode 100644
index 0000000000..16c8f1aa49
--- /dev/null
+++ b/tests/pytests/unit/transport/test_zeromq.py
@@ -0,0 +1,1042 @@
+"""
+ :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
+"""
+
+import ctypes
+import logging
+import multiprocessing
+import os
+import uuid
+
+import pytest
+import salt.config
+import salt.crypt
+import salt.exceptions
+import salt.ext.tornado.gen
+import salt.ext.tornado.ioloop
+import salt.log.setup
+import salt.transport.client
+import salt.transport.server
+import salt.transport.zeromq
+import salt.utils.platform
+import salt.utils.process
+import salt.utils.stringutils
+from salt.master import SMaster
+from tests.support.mock import MagicMock
+
+try:
+ from M2Crypto import RSA
+
+ HAS_M2 = True
+except ImportError:
+ HAS_M2 = False
+ try:
+ from Cryptodome.Cipher import PKCS1_OAEP
+ except ImportError:
+ from Crypto.Cipher import PKCS1_OAEP # nosec
+
+log = logging.getLogger(__name__)
+
+MASTER_PRIV_KEY = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAoAsMPt+4kuIG6vKyw9r3+OuZrVBee/2vDdVetW+Js5dTlgrJ
+aghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLnyHNJ/HpVhMG0M07MF6FMfILtDrrt8
+ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+fu6HYwu96HggmG2pqkOrn3iGfqBvV
+YVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpef8vRUrNicRLc7dAcvfhtgt2DXEZ2
+d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvTIIPQIjR8htFxGTz02STVXfnhnJ0Z
+k8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cYOwIDAQABAoIBABZUJEO7Y91+UnfC
+H6XKrZEZkcnH7j6/UIaOD9YhdyVKxhsnax1zh1S9vceNIgv5NltzIsfV6vrb6v2K
+Dx/F7Z0O0zR5o+MlO8ZncjoNKskex10gBEWG00Uqz/WPlddiQ/TSMJTv3uCBAzp+
+S2Zjdb4wYPUlgzSgb2ygxrhsRahMcSMG9PoX6klxMXFKMD1JxiY8QfAHahPzQXy9
+F7COZ0fCVo6BE+MqNuQ8tZeIxu8mOULQCCkLFwXmkz1FpfK/kNRmhIyhxwvCS+z4
+JuErW3uXfE64RLERiLp1bSxlDdpvRO2R41HAoNELTsKXJOEt4JANRHm/CeyA5wsh
+NpscufUCgYEAxhgPfcMDy2v3nL6KtkgYjdcOyRvsAF50QRbEa8ldO+87IoMDD/Oe
+osFERJ5hhyyEO78QnaLVegnykiw5DWEF02RKMhD/4XU+1UYVhY0wJjKQIBadsufB
+2dnaKjvwzUhPh5BrBqNHl/FXwNCRDiYqXa79eWCPC9OFbZcUWWq70s8CgYEAztOI
+61zRfmXJ7f70GgYbHg+GA7IrsAcsGRITsFR82Ho0lqdFFCxz7oK8QfL6bwMCGKyk
+nzk+twh6hhj5UNp18KN8wktlo02zTgzgemHwaLa2cd6xKgmAyuPiTgcgnzt5LVNG
+FOjIWkLwSlpkDTl7ZzY2QSy7t+mq5d750fpIrtUCgYBWXZUbcpPL88WgDB7z/Bjg
+dlvW6JqLSqMK4b8/cyp4AARbNp12LfQC55o5BIhm48y/M70tzRmfvIiKnEc/gwaE
+NJx4mZrGFFURrR2i/Xx5mt/lbZbRsmN89JM+iKWjCpzJ8PgIi9Wh9DIbOZOUhKVB
+9RJEAgo70LvCnPTdS0CaVwKBgDJW3BllAvw/rBFIH4OB/vGnF5gosmdqp3oGo1Ik
+jipmPAx6895AH4tquIVYrUl9svHsezjhxvjnkGK5C115foEuWXw0u60uiTiy+6Pt
+2IS0C93VNMulenpnUrppE7CN2iWFAiaura0CY9fE/lsVpYpucHAWgi32Kok+ZxGL
+WEttAoGAN9Ehsz4LeQxEj3x8wVeEMHF6OsznpwYsI2oVh6VxpS4AjgKYqeLVcnNi
+TlZFsuQcqgod8OgzA91tdB+Rp86NygmWD5WzeKXpCOg9uA+y/YL+0sgZZHsuvbK6
+PllUgXdYxqClk/hdBFB7v9AQoaj7K9Ga22v32msftYDQRJ94xOI=
+-----END RSA PRIVATE KEY-----
+"""
+
+
+MASTER_PUB_KEY = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoAsMPt+4kuIG6vKyw9r3
++OuZrVBee/2vDdVetW+Js5dTlgrJaghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLny
+HNJ/HpVhMG0M07MF6FMfILtDrrt8ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+f
+u6HYwu96HggmG2pqkOrn3iGfqBvVYVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpe
+f8vRUrNicRLc7dAcvfhtgt2DXEZ2d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvT
+IIPQIjR8htFxGTz02STVXfnhnJ0Zk8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cY
+OwIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+MASTER2_PRIV_KEY = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAp+8cTxguO6Vg+YO92VfHgNld3Zy8aM3JbZvpJcjTnis+YFJ7
+Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvTsMBZWvmUoEVUj1Xg8XXQkBvb9Ozy
+Gqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc2cKeCVvWFqDi0GRFGzyaXLaX3PPm
+M7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbuT1OqDfufXWQl/82JXeiwU2cOpqWq
+7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww3oJSwvMbAmgzvOhqqhlqv+K7u0u7
+FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQbQIDAQABAoIBAADrqWDQnd5DVZEA
+lR+WINiWuHJAy/KaIC7K4kAMBgbxrz2ZbiY9Ok/zBk5fcnxIZDVtXd1sZicmPlro
+GuWodIxdPZAnWpZ3UtOXUayZK/vCP1YsH1agmEqXuKsCu6Fc+K8VzReOHxLUkmXn
+FYM+tixGahXcjEOi/aNNTWitEB6OemRM1UeLJFzRcfyXiqzHpHCIZwBpTUAsmzcG
+QiVDkMTKubwo/m+PVXburX2CGibUydctgbrYIc7EJvyx/cpRiPZXo1PhHQWdu4Y1
+SOaC66WLsP/wqvtHo58JQ6EN/gjSsbAgGGVkZ1xMo66nR+pLpR27coS7o03xCks6
+DY/0mukCgYEAuLIGgBnqoh7YsOBLd/Bc1UTfDMxJhNseo+hZemtkSXz2Jn51322F
+Zw/FVN4ArXgluH+XsOhvG/MFFpojwZSrb0Qq5b1MRdo9qycq8lGqNtlN1WHqosDQ
+zW29kpL0tlRrSDpww3wRESsN9rH5XIrJ1b3ZXuO7asR+KBVQMy/+NcUCgYEA6MSC
+c+fywltKPgmPl5j0DPoDe5SXE/6JQy7w/vVGrGfWGf/zEJmhzS2R+CcfTTEqaT0T
+Yw8+XbFgKAqsxwtE9MUXLTVLI3sSUyE4g7blCYscOqhZ8ItCUKDXWkSpt++rG0Um
+1+cEJP/0oCazG6MWqvBC4NpQ1nzh46QpjWqMwokCgYAKDLXJ1p8rvx3vUeUJW6zR
+dfPlEGCXuAyMwqHLxXgpf4EtSwhC5gSyPOtx2LqUtcrnpRmt6JfTH4ARYMW9TMef
+QEhNQ+WYj213mKP/l235mg1gJPnNbUxvQR9lkFV8bk+AGJ32JRQQqRUTbU+yN2MQ
+HEptnVqfTp3GtJIultfwOQKBgG+RyYmu8wBP650izg33BXu21raEeYne5oIqXN+I
+R5DZ0JjzwtkBGroTDrVoYyuH1nFNEh7YLqeQHqvyufBKKYo9cid8NQDTu+vWr5UK
+tGvHnwdKrJmM1oN5JOAiq0r7+QMAOWchVy449VNSWWV03aeftB685iR5BXkstbIQ
+EVopAoGAfcGBTAhmceK/4Q83H/FXBWy0PAa1kZGg/q8+Z0KY76AqyxOVl0/CU/rB
+3tO3sKhaMTHPME/MiQjQQGoaK1JgPY6JHYvly2KomrJ8QTugqNGyMzdVJkXAK2AM
+GAwC8ivAkHf8CHrHa1W7l8t2IqBjW1aRt7mOW92nfG88Hck0Mbo=
+-----END RSA PRIVATE KEY-----
+"""
+
+
+MASTER2_PUB_KEY = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp+8cTxguO6Vg+YO92VfH
+gNld3Zy8aM3JbZvpJcjTnis+YFJ7Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvT
+sMBZWvmUoEVUj1Xg8XXQkBvb9OzyGqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc
+2cKeCVvWFqDi0GRFGzyaXLaX3PPmM7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbu
+T1OqDfufXWQl/82JXeiwU2cOpqWq7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww
+3oJSwvMbAmgzvOhqqhlqv+K7u0u7FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQ
+bQIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+
+MASTER_SIGNING_PRIV = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAtieqrBMTM0MSIbhPKkDcozHqyXKyL/+bXYYw+iVPsns7c7bJ
+zBqenLQlWoRVyrVyBFrrwQSrKu/0Mqn3l639iOGPlUoR3I7aZKIpyEdDkqd3xGIC
+e+BtNNDqhUai67L63hEdG+iYAchi8UZw3LZGtcGpJ3FkBH4cYFX9EOam2QjbD7WY
+EO7m1+j6XEYIOTCmAP9dGAvBbU0Jblc+wYxG3qNr+2dBWsK76QXWEqib2VSOGP+z
+gjJa8tqY7PXXdOJpalQXNphmD/4o4pHKR4Euy0yL/1oMkpacmrV61LWB8Trnx9nS
+9gdVrUteQF/cL1KAGwOsdVmiLpHfvqLLRqSAAQIDAQABAoIBABjB+HEN4Kixf4fk
+wKHKEhL+SF6b/7sFX00NXZ/KLXRhSnnWSMQ8g/1hgMg2P2DfW4FbCDsCUu9xkLvI
+HTZY+CJAIh9U42uaYPWXkt09TmJi76TZ+2Nx4/XvRUjbCm7Fs1I2ekHeUbbAUS5g
++BsPjTnL+h05zLHNoDa5yT0gVGIgFsQcX/w38arZCe8Rjp9le7PXUB5IIqASsDiw
+t8zJvdyWToeXd0WswCHTQu5coHvKo5MCjIZZ1Ink1yJcCCc3rKDc+q3jB2z9T9oW
+cUsKzJ4VuleiYj1eRxFITBmXbjKrb/GPRRUkeqCQbs68Hyj2d3UtOFDPeF4vng/3
+jGsHPq8CgYEA0AHAbwykVC6NMa37BTvEqcKoxbjTtErxR+yczlmVDfma9vkwtZvx
+FJdbS/+WGA/ucDby5x5b2T5k1J9ueMR86xukb+HnyS0WKsZ94Ie8WnJAcbp+38M6
+7LD0u74Cgk93oagDAzUHqdLq9cXxv/ppBpxVB1Uvu8DfVMHj+wt6ie8CgYEA4C7u
+u+6b8EmbGqEdtlPpScKG0WFstJEDGXRARDCRiVP2w6wm25v8UssCPvWcwf8U1Hoq
+lhMY+H6a5dnRRiNYql1MGQAsqMi7VeJNYb0B1uxi7X8MPM+SvXoAglX7wm1z0cVy
+O4CE5sEKbBg6aQabx1x9tzdrm80SKuSsLc5HRQ8CgYEAp/mCKSuQWNru8ruJBwTp
+IB4upN1JOUN77ZVKW+lD0XFMjz1U9JPl77b65ziTQQM8jioRpkqB6cHVM088qxIh
+vssn06Iex/s893YrmPKETJYPLMhqRNEn+JQ+To53ADykY0uGg0SD18SYMbmULHBP
++CKvF6jXT0vGDnA1ZzoxzskCgYEA2nQhYrRS9EVlhP93KpJ+A8gxA5tCCHo+YPFt
+JoWFbCKLlYUNoHZR3IPCPoOsK0Zbj+kz0mXtsUf9vPkR+py669haLQqEejyQgFIz
+QYiiYEKc6/0feapzvXtDP751w7JQaBtVAzJrT0jQ1SCO2oT8C7rPLlgs3fdpOq72
+MPSPcnUCgYBWHm6bn4HvaoUSr0v2hyD9fHZS/wDTnlXVe5c1XXgyKlJemo5dvycf
+HUCmN/xIuO6AsiMdqIzv+arNJdboz+O+bNtS43LkTJfEH3xj2/DdUogdvOgG/iPM
+u9KBT1h+euws7PqC5qt4vqLwCTTCZXmUS8Riv+62RCC3kZ5AbpT3ZA==
+-----END RSA PRIVATE KEY-----
+"""
+
+MASTER_SIGNING_PUB = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtieqrBMTM0MSIbhPKkDc
+ozHqyXKyL/+bXYYw+iVPsns7c7bJzBqenLQlWoRVyrVyBFrrwQSrKu/0Mqn3l639
+iOGPlUoR3I7aZKIpyEdDkqd3xGICe+BtNNDqhUai67L63hEdG+iYAchi8UZw3LZG
+tcGpJ3FkBH4cYFX9EOam2QjbD7WYEO7m1+j6XEYIOTCmAP9dGAvBbU0Jblc+wYxG
+3qNr+2dBWsK76QXWEqib2VSOGP+zgjJa8tqY7PXXdOJpalQXNphmD/4o4pHKR4Eu
+y0yL/1oMkpacmrV61LWB8Trnx9nS9gdVrUteQF/cL1KAGwOsdVmiLpHfvqLLRqSA
+AQIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+MINION_PRIV_KEY = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAsT6TwnlI0L7urjXu6D5E11tFJ/NglQ45jW/WN9tAUNvphq6Q
+cjJCd/aWmdqlqe7ix8y9M/8rgwghRQsnPXblVBvPwFcUEXhMRnOGzqbq/0zyQX01
+KecT0plBhlDt2lTyCLU6E4XCqyLbPfOxgXzsVqM0/TnzRtpVvGNy+5N4eFGylrjb
+cJhPxKt2G9TDOCM/hYacDs5RVIYQQmcYb8LJq7G3++FfWpYRDaxdKoHNFDspEynd
+jzr67hgThnwzc388OKNJx/7B2atwPTunPb3YBjgwDyRO/01OKK4gUHdw5KoctFgp
+kDCDjwjemlyXV+MYODRTIdtOlAP83ZkntEuLoQIDAQABAoIBAAJOKNtvFGfF2l9H
+S4CXZSUGU0a+JaCkR+wmnjsPwPn/dXDpAe8nGpidpNicPWqRm6WABjeQHaxda+fB
+lpSrRtEdo3zoi2957xQJ5wddDtI1pmXJQrdbm0H/K39oIg/Xtv/IZT769TM6OtVg
+paUxG/aftmeGXDtGfIL8w1jkuPABRBLOakWQA9uVdeG19KTU0Ag8ilpJdEX64uFJ
+W75bpVjT+KO/6aV1inuCntQSP097aYvUWajRwuiYVJOxoBZHme3IObcE6mdnYXeQ
+wblyWBpJUHrOS4MP4HCODV2pHKZ2rr7Nwhh8lMNw/eY9OP0ifz2AcAqe3sUMQOKP
+T0qRC6ECgYEAyeU5JvUPOpxXvvChYh6gJ8pYTIh1ueDP0O5e4t3vhz6lfy9DKtRN
+ROJLUorHvw/yVXMR72nT07a0z2VswcrUSw8ov3sI53F0NkLGEafQ35lVhTGs4vTl
+CFoQCuAKPsxeUl4AIbfbpkDsLGQqzW1diFArK7YeQkpGuGaGodXl480CgYEA4L40
+x5cUXnAhTPsybo7sbcpiwFHoGblmdkvpYvHA2QxtNSi2iHHdqGo8qP1YsZjKQn58
+371NhtqidrJ6i/8EBFP1dy+y/jr9qYlZNNGcQeBi+lshrEOIf1ct56KePG79s8lm
+DmD1OY8tO2R37+Py46Nq1n6viT/ST4NjLQI3GyUCgYEAiOswSDA3ZLs0cqRD/gPg
+/zsliLmehTFmHj4aEWcLkz+0Ar3tojUaNdX12QOPFQ7efH6uMhwl8NVeZ6xUBlTk
+hgbAzqLE1hjGBCpiowSZDZqyOcMHiV8ll/VkHcv0hsQYT2m6UyOaDXTH9g70TB6Y
+KOKddGZsvO4cad/1+/jQkB0CgYAzDEEkzLY9tS57M9uCrUgasAu6L2CO50PUvu1m
+Ig9xvZbYqkS7vVFhva/FmrYYsOHQNLbcgz0m0mZwm52mSuh4qzFoPxdjE7cmWSJA
+ExRxCiyxPR3q6PQKKJ0urgtPIs7RlX9u6KsKxfC6OtnbTWWQO0A7NE9e13ZHxUoz
+oPsvWQKBgCa0+Fb2lzUeiQz9bV1CBkWneDZUXuZHmabAZomokX+h/bq+GcJFzZjW
+3kAHwYkIy9IAy3SyO/6CP0V3vAye1p+XbotiwsQ/XZnr0pflSQL3J1l1CyN3aopg
+Niv7k/zBn15B72aK73R/CpUSk9W/eJGqk1NcNwf8hJHsboRYx6BR
+-----END RSA PRIVATE KEY-----
+"""
+
+
+MINION_PUB_KEY = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsT6TwnlI0L7urjXu6D5E
+11tFJ/NglQ45jW/WN9tAUNvphq6QcjJCd/aWmdqlqe7ix8y9M/8rgwghRQsnPXbl
+VBvPwFcUEXhMRnOGzqbq/0zyQX01KecT0plBhlDt2lTyCLU6E4XCqyLbPfOxgXzs
+VqM0/TnzRtpVvGNy+5N4eFGylrjbcJhPxKt2G9TDOCM/hYacDs5RVIYQQmcYb8LJ
+q7G3++FfWpYRDaxdKoHNFDspEyndjzr67hgThnwzc388OKNJx/7B2atwPTunPb3Y
+BjgwDyRO/01OKK4gUHdw5KoctFgpkDCDjwjemlyXV+MYODRTIdtOlAP83ZkntEuL
+oQIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+AES_KEY = "8wxWlOaMMQ4d3yT74LL4+hGrGTf65w8VgrcNjLJeLRQ2Q6zMa8ItY2EQUgMKKDb7JY+RnPUxbB0="
+
+
+@pytest.fixture
+def pki_dir(tmpdir):
+ madir = tmpdir.mkdir("master")
+ mapriv = madir.join("master.pem")
+ mapriv.write(MASTER_PRIV_KEY.strip())
+ mapub = madir.join("master.pub")
+ mapub.write(MASTER_PUB_KEY.strip())
+
+ maspriv = madir.join("master_sign.pem")
+ maspriv.write(MASTER_SIGNING_PRIV.strip())
+ maspub = madir.join("master_sign.pub")
+ maspub.write(MASTER_SIGNING_PUB.strip())
+
+ for sdir in [
+ "minions_autosign",
+ "minions_denied",
+ "minions_pre",
+ "minions_rejected",
+ ]:
+ madir.mkdir(sdir)
+
+ mipub = madir.mkdir("minions").join("minion")
+ mipub.write(MINION_PUB_KEY.strip())
+
+ midir = tmpdir.mkdir("minion")
+ mipub = midir.join("minion.pub")
+ mipub.write(MINION_PUB_KEY.strip())
+ mipriv = midir.join("minion.pem")
+ mipriv.write(MINION_PRIV_KEY.strip())
+ mimapriv = midir.join("minion_master.pub")
+ mimapriv.write(MASTER_PUB_KEY.strip())
+ mimaspriv = midir.join("master_sign.pub")
+ mimaspriv.write(MASTER_SIGNING_PUB.strip())
+ try:
+ yield tmpdir
+ finally:
+ tmpdir.remove()
+
+
+def test_req_server_chan_encrypt_v2(pki_dir):
+ loop = salt.ext.tornado.ioloop.IOLoop.current()
+ opts = {
+ "worker_threads": 1,
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "zmq_monitor": False,
+ "mworker_queue_niceness": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("master")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(opts)
+ dictkey = "pillar"
+ nonce = "abcdefg"
+ pillar_data = {"pillar1": "meh"}
+ ret = server._encrypt_private(pillar_data, dictkey, "minion", nonce)
+ assert "key" in ret
+ assert dictkey in ret
+
+ key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None)
+ if HAS_M2:
+ aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(key)
+ aes = cipher.decrypt(ret["key"])
+ pcrypt = salt.crypt.Crypticle(opts, aes)
+ signed_msg = pcrypt.loads(ret[dictkey])
+
+ assert "sig" in signed_msg
+ assert "data" in signed_msg
+ data = salt.payload.Serial({}).loads(signed_msg["data"])
+ assert "key" in data
+ assert data["key"] == ret["key"]
+ assert "key" in data
+ assert data["nonce"] == nonce
+ assert "pillar" in data
+ assert data["pillar"] == pillar_data
+
+
+def test_req_server_chan_encrypt_v1(pki_dir):
+ loop = salt.ext.tornado.ioloop.IOLoop.current()
+ opts = {
+ "worker_threads": 1,
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "zmq_monitor": False,
+ "mworker_queue_niceness": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("master")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(opts)
+ dictkey = "pillar"
+ nonce = "abcdefg"
+ pillar_data = {"pillar1": "meh"}
+ ret = server._encrypt_private(pillar_data, dictkey, "minion", sign_messages=False)
+
+ assert "key" in ret
+ assert dictkey in ret
+
+ key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None)
+ if HAS_M2:
+ aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(key)
+ aes = cipher.decrypt(ret["key"])
+ pcrypt = salt.crypt.Crypticle(opts, aes)
+ data = pcrypt.loads(ret[dictkey])
+ assert data == pillar_data
+
+
+def test_req_chan_decode_data_dict_entry_v1(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop)
+ dictkey = "pillar"
+ target = "minion"
+ pillar_data = {"pillar1": "meh"}
+ ret = server._encrypt_private(pillar_data, dictkey, target, sign_messages=False)
+ key = client.auth.get_keys()
+ if HAS_M2:
+ aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(key)
+ aes = cipher.decrypt(ret["key"])
+ pcrypt = salt.crypt.Crypticle(client.opts, aes)
+ ret_pillar_data = pcrypt.loads(ret[dictkey])
+ assert ret_pillar_data == pillar_data
+
+
+async def test_req_chan_decode_data_dict_entry_v2(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop)
+
+ dictkey = "pillar"
+ target = "minion"
+ pillar_data = {"pillar1": "meh"}
+
+ # Mock auth and message client.
+ auth = client.auth
+ auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY)
+ client.auth = MagicMock()
+ client.auth.authenticated = True
+ client.auth.get_keys = auth.get_keys
+ client.auth.crypticle.dumps = auth.crypticle.dumps
+ client.auth.crypticle.loads = auth.crypticle.loads
+ client.message_client = MagicMock()
+
+ @salt.ext.tornado.gen.coroutine
+ def mocksend(msg, timeout=60, tries=3):
+ client.message_client.msg = msg
+ load = client.auth.crypticle.loads(msg["load"])
+ ret = server._encrypt_private(
+ pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
+ )
+ raise salt.ext.tornado.gen.Return(ret)
+
+ client.message_client.send = mocksend
+
+ # Note the 'ver' value in 'load' does not represent the the 'version' sent
+ # in the top level of the transport's message.
+ load = {
+ "id": target,
+ "grains": {},
+ "saltenv": "base",
+ "pillarenv": "base",
+ "pillar_override": True,
+ "extra_minion_data": {},
+ "ver": "2",
+ "cmd": "_pillar",
+ }
+ ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",)
+ assert "version" in client.message_client.msg
+ assert client.message_client.msg["version"] == 2
+ assert ret == {"pillar1": "meh"}
+
+
+async def test_req_chan_decode_data_dict_entry_v2_bad_nonce(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop)
+
+ dictkey = "pillar"
+ badnonce = "abcdefg"
+ target = "minion"
+ pillar_data = {"pillar1": "meh"}
+
+ # Mock auth and message client.
+ auth = client.auth
+ auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY)
+ client.auth = MagicMock()
+ client.auth.authenticated = True
+ client.auth.get_keys = auth.get_keys
+ client.auth.crypticle.dumps = auth.crypticle.dumps
+ client.auth.crypticle.loads = auth.crypticle.loads
+ client.message_client = MagicMock()
+ ret = server._encrypt_private(
+ pillar_data, dictkey, target, nonce=badnonce, sign_messages=True
+ )
+
+ @salt.ext.tornado.gen.coroutine
+ def mocksend(msg, timeout=60, tries=3):
+ client.message_client.msg = msg
+ raise salt.ext.tornado.gen.Return(ret)
+
+ client.message_client.send = mocksend
+
+ # Note the 'ver' value in 'load' does not represent the the 'version' sent
+ # in the top level of the transport's message.
+ load = {
+ "id": target,
+ "grains": {},
+ "saltenv": "base",
+ "pillarenv": "base",
+ "pillar_override": True,
+ "extra_minion_data": {},
+ "ver": "2",
+ "cmd": "_pillar",
+ }
+
+ with pytest.raises(salt.crypt.AuthenticationError) as excinfo:
+ ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",)
+ assert "Pillar nonce verification failed." == excinfo.value.message
+
+
+async def test_req_chan_decode_data_dict_entry_v2_bad_signature(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop)
+
+ dictkey = "pillar"
+ badnonce = "abcdefg"
+ target = "minion"
+ pillar_data = {"pillar1": "meh"}
+
+ # Mock auth and message client.
+ auth = client.auth
+ auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY)
+ client.auth = MagicMock()
+ client.auth.authenticated = True
+ client.auth.get_keys = auth.get_keys
+ client.auth.crypticle.dumps = auth.crypticle.dumps
+ client.auth.crypticle.loads = auth.crypticle.loads
+ client.message_client = MagicMock()
+
+ @salt.ext.tornado.gen.coroutine
+ def mocksend(msg, timeout=60, tries=3):
+ client.message_client.msg = msg
+ load = client.auth.crypticle.loads(msg["load"])
+ ret = server._encrypt_private(
+ pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
+ )
+
+ key = client.auth.get_keys()
+ if HAS_M2:
+ aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(key)
+ aes = cipher.decrypt(ret["key"])
+ pcrypt = salt.crypt.Crypticle(client.opts, aes)
+ signed_msg = pcrypt.loads(ret[dictkey])
+ # Changing the pillar data will cause the signature verification to
+ # fail.
+ data = salt.payload.Serial({}).loads(signed_msg["data"])
+ data["pillar"] = {"pillar1": "bar"}
+ signed_msg["data"] = salt.payload.Serial({}).dumps(data)
+ ret[dictkey] = pcrypt.dumps(signed_msg)
+ raise salt.ext.tornado.gen.Return(ret)
+
+ client.message_client.send = mocksend
+
+ # Note the 'ver' value in 'load' does not represent the the 'version' sent
+ # in the top level of the transport's message.
+ load = {
+ "id": target,
+ "grains": {},
+ "saltenv": "base",
+ "pillarenv": "base",
+ "pillar_override": True,
+ "extra_minion_data": {},
+ "ver": "2",
+ "cmd": "_pillar",
+ }
+
+ with pytest.raises(salt.crypt.AuthenticationError) as excinfo:
+ ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",)
+ assert "Pillar payload signature failed to validate." == excinfo.value.message
+
+
+async def test_req_chan_decode_data_dict_entry_v2_bad_key(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop)
+
+ dictkey = "pillar"
+ badnonce = "abcdefg"
+ target = "minion"
+ pillar_data = {"pillar1": "meh"}
+
+ # Mock auth and message client.
+ auth = client.auth
+ auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY)
+ client.auth = MagicMock()
+ client.auth.authenticated = True
+ client.auth.get_keys = auth.get_keys
+ client.auth.crypticle.dumps = auth.crypticle.dumps
+ client.auth.crypticle.loads = auth.crypticle.loads
+ client.message_client = MagicMock()
+
+ @salt.ext.tornado.gen.coroutine
+ def mocksend(msg, timeout=60, tries=3):
+ client.message_client.msg = msg
+ load = client.auth.crypticle.loads(msg["load"])
+ ret = server._encrypt_private(
+ pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
+ )
+
+ key = client.auth.get_keys()
+ if HAS_M2:
+ aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(key)
+ aes = cipher.decrypt(ret["key"])
+ pcrypt = salt.crypt.Crypticle(client.opts, aes)
+ signed_msg = pcrypt.loads(ret[dictkey])
+
+ # Now encrypt with a different key
+ key = salt.crypt.Crypticle.generate_key_string()
+ pcrypt = salt.crypt.Crypticle(opts, key)
+ pubfn = os.path.join(master_opts["pki_dir"], "minions", "minion")
+ pub = salt.crypt.get_rsa_pub_key(pubfn)
+ ret[dictkey] = pcrypt.dumps(signed_msg)
+ key = salt.utils.stringutils.to_bytes(key)
+ if HAS_M2:
+ ret["key"] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding)
+ else:
+ cipher = PKCS1_OAEP.new(pub)
+ ret["key"] = cipher.encrypt(key)
+ raise salt.ext.tornado.gen.Return(ret)
+
+ client.message_client.send = mocksend
+
+ # Note the 'ver' value in 'load' does not represent the the 'version' sent
+ # in the top level of the transport's message.
+ load = {
+ "id": target,
+ "grains": {},
+ "saltenv": "base",
+ "pillarenv": "base",
+ "pillar_override": True,
+ "extra_minion_data": {},
+ "ver": "2",
+ "cmd": "_pillar",
+ }
+
+ with pytest.raises(salt.crypt.AuthenticationError) as excinfo:
+ ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",)
+ assert "Key verification failed." == excinfo.value.message
+
+
+async def test_req_serv_auth_v1(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "master_sign_pubkey": False,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+
+ pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub")))
+ token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string())
+ nonce = uuid.uuid4().hex
+
+ # We need to read the public key with fopen otherwise the newlines might
+ # not match on windows.
+ with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp:
+ pub_key = fp.read()
+
+ load = {
+ "cmd": "_auth",
+ "id": "minion",
+ "token": token,
+ "pub": pub_key,
+ }
+ ret = server._auth(load, sign_messages=False)
+ assert "load" not in ret
+
+
+async def test_req_serv_auth_v2(pki_dir):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "master_sign_pubkey": False,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+
+ pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub")))
+ token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string())
+ nonce = uuid.uuid4().hex
+
+ # We need to read the public key with fopen otherwise the newlines might
+ # not match on windows.
+ with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp:
+ pub_key = fp.read()
+
+ load = {
+ "cmd": "_auth",
+ "id": "minion",
+ "nonce": nonce,
+ "token": token,
+ "pub": pub_key,
+ }
+ ret = server._auth(load, sign_messages=True)
+ assert "sig" in ret
+ assert "load" in ret
+
+
+async def test_req_chan_auth_v2(pki_dir, io_loop):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ master_opts["master_sign_pubkey"] = False
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+ opts["verify_master_pubkey_sign"] = False
+ opts["always_verify_signature"] = False
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop)
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ assert "version" in pload
+ assert pload["version"] == 2
+
+ ret = server._auth(pload["load"], sign_messages=True)
+ assert "sig" in ret
+ ret = client.auth.handle_signin_response(signin_payload, ret)
+ assert "aes" in ret
+ assert "master_uri" in ret
+ assert "publish_port" in ret
+
+
+async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ master_opts["master_sign_pubkey"] = True
+ master_opts["master_use_pubkey_signature"] = False
+ master_opts["signing_key_pass"] = True
+ master_opts["master_sign_key_name"] = "master_sign"
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+ opts["verify_master_pubkey_sign"] = True
+ opts["always_verify_signature"] = True
+ opts["master_sign_key_name"] = "master_sign"
+ opts["master"] = "master"
+
+ assert (
+ pki_dir.join("minion", "minion_master.pub").read()
+ == pki_dir.join("master", "master.pub").read()
+ )
+
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop)
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ assert "version" in pload
+ assert pload["version"] == 2
+
+ server_reply = server._auth(pload["load"], sign_messages=True)
+ # With version 2 we always get a clear signed response
+ assert "enc" in server_reply
+ assert server_reply["enc"] == "clear"
+ assert "sig" in server_reply
+ assert "load" in server_reply
+ ret = client.auth.handle_signin_response(signin_payload, server_reply)
+ assert "aes" in ret
+ assert "master_uri" in ret
+ assert "publish_port" in ret
+
+ # Now create a new master key pair and try auth with it.
+ mapriv = pki_dir.join("master", "master.pem")
+ mapriv.remove()
+ mapriv.write(MASTER2_PRIV_KEY.strip())
+ mapub = pki_dir.join("master", "master.pub")
+ mapub.remove()
+ mapub.write(MASTER2_PUB_KEY.strip())
+
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ server_reply = server._auth(pload["load"], sign_messages=True)
+ ret = client.auth.handle_signin_response(signin_payload, server_reply)
+
+ assert "aes" in ret
+ assert "master_uri" in ret
+ assert "publish_port" in ret
+
+ assert (
+ pki_dir.join("minion", "minion_master.pub").read()
+ == pki_dir.join("master", "master.pub").read()
+ )
+
+
+async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop):
+
+ pki_dir.join("master", "minions", "minion").remove()
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ "acceptance_wait_time": 3,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ master_opts["master_sign_pubkey"] = False
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+ opts["verify_master_pubkey_sign"] = False
+ opts["always_verify_signature"] = False
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop)
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ assert "version" in pload
+ assert pload["version"] == 2
+
+ ret = server._auth(pload["load"], sign_messages=True)
+ assert "sig" in ret
+ ret = client.auth.handle_signin_response(signin_payload, ret)
+ assert ret == "retry"
+
+
+async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_loop):
+
+ pki_dir.join("master", "minions", "minion").remove()
+
+ # Give the master a different key than the minion has.
+ mapriv = pki_dir.join("master", "master.pem")
+ mapriv.remove()
+ mapriv.write(MASTER2_PRIV_KEY.strip())
+ mapub = pki_dir.join("master", "master.pub")
+ mapub.remove()
+ mapub.write(MASTER2_PUB_KEY.strip())
+
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ "acceptance_wait_time": 3,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ master_opts["master_sign_pubkey"] = False
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+ opts["verify_master_pubkey_sign"] = False
+ opts["always_verify_signature"] = False
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop)
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ assert "version" in pload
+ assert pload["version"] == 2
+
+ ret = server._auth(pload["load"], sign_messages=True)
+ assert "sig" in ret
+ with pytest.raises(salt.crypt.SaltClientError, match="Invalid signature"):
+ ret = client.auth.handle_signin_response(signin_payload, ret)
+
+
+async def test_req_chan_auth_v2_new_minion_without_master_pub(pki_dir, io_loop):
+
+ pki_dir.join("master", "minions", "minion").remove()
+ pki_dir.join("minion", "minion_master.pub").remove()
+ mockloop = MagicMock()
+ opts = {
+ "master_uri": "tcp://127.0.0.1:4506",
+ "interface": "127.0.0.1",
+ "ret_port": 4506,
+ "ipv6": False,
+ "sock_dir": ".",
+ "pki_dir": str(pki_dir.join("minion")),
+ "id": "minion",
+ "__role": "minion",
+ "keysize": 4096,
+ "max_minions": 0,
+ "auto_accept": False,
+ "open_mode": False,
+ "key_pass": None,
+ "publish_port": 4505,
+ "auth_mode": 1,
+ "acceptance_wait_time": 3,
+ }
+ SMaster.secrets["aes"] = {
+ "secret": multiprocessing.Array(
+ ctypes.c_char,
+ salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
+ ),
+ "reload": salt.crypt.Crypticle.generate_key_string,
+ }
+ master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
+ master_opts["master_sign_pubkey"] = False
+ server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts)
+ server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
+ server.cache_cli = False
+ server.master_key = salt.crypt.MasterKeys(server.opts)
+ opts["verify_master_pubkey_sign"] = False
+ opts["always_verify_signature"] = False
+ client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop)
+ signin_payload = client.auth.minion_sign_in_payload()
+ pload = client._package_load(signin_payload)
+ assert "version" in pload
+ assert pload["version"] == 2
+
+ ret = server._auth(pload["load"], sign_messages=True)
+ assert "sig" in ret
+ ret = client.auth.handle_signin_response(signin_payload, ret)
+ assert ret == "retry"
diff --git a/tests/unit/transport/mixins.py b/tests/unit/transport/mixins.py
index 540cd33d73..a29182a3f3 100644
--- a/tests/unit/transport/mixins.py
+++ b/tests/unit/transport/mixins.py
@@ -9,6 +9,7 @@ import salt.transport.client
# Import 3rd-party libs
from salt.ext import six
import salt.ext.tornado.gen
+import salt.transport.client
def run_loop_in_thread(loop, evt):
@@ -30,8 +31,7 @@ def run_loop_in_thread(loop, evt):
loop.close()
-class ReqChannelMixin(object):
-
+class ReqChannelMixin:
def test_basic(self):
'''
Test a variety of messages, make sure we get the expected responses
@@ -42,8 +42,8 @@ class ReqChannelMixin(object):
{'baz': 'qux', 'list': [1, 2, 3]},
]
for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- self.assertEqual(ret['load'], msg)
+ ret = self.channel.send(dict(msg), timeout=2, tries=1)
+ self.assertEqual(ret["load"], msg)
def test_normalization(self):
'''
@@ -57,7 +57,7 @@ class ReqChannelMixin(object):
]
for msg in msgs:
ret = self.channel.send(msg, timeout=2, tries=1)
- for k, v in six.iteritems(ret['load']):
+ for k, v in ret["load"].items():
self.assertEqual(types[k], type(v))
def test_badload(self):
@@ -70,7 +70,7 @@ class ReqChannelMixin(object):
self.assertEqual(ret, 'payload and load must be a dict')
-class PubChannelMixin(object):
+class PubChannelMixin:
def test_basic(self):
self.pub = None
diff --git a/tests/unit/transport/test_tcp.py b/tests/unit/transport/test_tcp.py
index 6cc29e1414..edf3d9b12e 100644
--- a/tests/unit/transport/test_tcp.py
+++ b/tests/unit/transport/test_tcp.py
@@ -27,6 +27,13 @@ from salt.transport.tcp import SaltMessageClientPool, SaltMessageClient, TCPPubS
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.helpers import get_unused_localhost_port, flaky
+from salt.ext.tornado.testing import AsyncTestCase, gen_test
+from salt.transport.tcp import (
+ SaltMessageClient,
+ SaltMessageClientPool,
+ TCPPubServerChannel,
+)
+from tests.support.helpers import flaky, slowTest
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.mock import MagicMock, patch
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin, run_loop_in_thread
@@ -61,11 +68,13 @@ class BaseTCPReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
)
cls.minion_config = cls.get_temp_config(
- 'minion',
- **{'transport': 'tcp',
- 'master_ip': '127.0.0.1',
- 'master_port': ret_port,
- 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
+ "minion",
+ **{
+ "transport": "tcp",
+ "master_ip": "127.0.0.1",
+ "master_port": ret_port,
+ "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
+ }
)
cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
@@ -174,12 +183,14 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
)
cls.minion_config = cls.get_temp_config(
- 'minion',
- **{'transport': 'tcp',
- 'master_ip': '127.0.0.1',
- 'auth_timeout': 1,
- 'master_port': ret_port,
- 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
+ "minion",
+ **{
+ "transport": "tcp",
+ "master_ip": "127.0.0.1",
+ "auth_timeout": 1,
+ "master_port": ret_port,
+ "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
+ }
)
cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
@@ -216,17 +227,17 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
del cls.req_server_channel
def setUp(self):
- super(BaseTCPPubCase, self).setUp()
+ super().setUp()
self._start_handlers = dict(self.io_loop._handlers)
def tearDown(self):
- super(BaseTCPPubCase, self).tearDown()
+ super().tearDown()
failures = []
- for k, v in six.iteritems(self.io_loop._handlers):
+ for k, v in self.io_loop._handlers.items():
if self._start_handlers.get(k) != v:
failures.append((k, v))
if failures:
- raise Exception('FDs still attached to the IOLoop: {0}'.format(failures))
+ raise Exception("FDs still attached to the IOLoop: {}".format(failures))
del self.channel
del self._start_handlers
@@ -260,7 +271,7 @@ class AsyncPubChannelTest(BaseTCPPubCase, PubChannelMixin):
class SaltMessageClientPoolTest(AsyncTestCase):
def setUp(self):
- super(SaltMessageClientPoolTest, self).setUp()
+ super().setUp()
sock_pool_size = 5
with patch('salt.transport.tcp.SaltMessageClient.__init__', MagicMock(return_value=None)):
self.message_client_pool = SaltMessageClientPool({'sock_pool_size': sock_pool_size},
@@ -271,7 +282,7 @@ class SaltMessageClientPoolTest(AsyncTestCase):
def tearDown(self):
with patch('salt.transport.tcp.SaltMessageClient.close', MagicMock(return_value=None)):
del self.original_message_clients
- super(SaltMessageClientPoolTest, self).tearDown()
+ super().tearDown()
def test_send(self):
for message_client_mock in self.message_client_pool.message_clients:
@@ -384,66 +395,64 @@ class SaltMessageClientCleanupTest(TestCase, AdaptedConfigurationTestCaseMixin):
class TCPPubServerChannelTest(TestCase, AdaptedConfigurationTestCaseMixin):
- @patch('salt.master.SMaster.secrets')
- @patch('salt.crypt.Crypticle')
- @patch('salt.utils.asynchronous.SyncWrapper')
- def test_publish_filtering(self, sync_wrapper, crypticle, secrets):
- opts = self.get_temp_config('master')
+ @patch("salt.master.SMaster.secrets")
+ @patch("salt.crypt.Crypticle")
+ def test_publish_filtering(self, crypticle, secrets):
+ opts = self.get_temp_config("master")
opts["sign_pub_messages"] = False
channel = TCPPubServerChannel(opts)
- wrap = MagicMock()
crypt = MagicMock()
crypt.dumps.return_value = {"test": "value"}
secrets.return_value = {"aes": {"secret": None}}
crypticle.return_value = crypt
- sync_wrapper.return_value = wrap
# try simple publish with glob tgt_type
- channel.publish({"test": "value", "tgt_type": "glob", "tgt": "*"})
- payload = wrap.send.call_args[0][0]
+ payload = channel.pack_publish(
+ {"test": "value", "tgt_type": "glob", "tgt": "*"}
+ )
# verify we send it without any specific topic
assert "topic_lst" not in payload
# try simple publish with list tgt_type
- channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]})
- payload = wrap.send.call_args[0][0]
+ payload = channel.pack_publish(
+ {"test": "value", "tgt_type": "list", "tgt": ["minion01"]}
+ )
# verify we send it with correct topic
assert "topic_lst" in payload
self.assertEqual(payload["topic_lst"], ["minion01"])
# try with syndic settings
- opts['order_masters'] = True
- channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]})
- payload = wrap.send.call_args[0][0]
+ opts["order_masters"] = True
+ payload = channel.pack_publish(
+ {"test": "value", "tgt_type": "list", "tgt": ["minion01"]}
+ )
# verify we send it without topic for syndics
assert "topic_lst" not in payload
- @patch('salt.utils.minions.CkMinions.check_minions')
- @patch('salt.master.SMaster.secrets')
- @patch('salt.crypt.Crypticle')
- @patch('salt.utils.asynchronous.SyncWrapper')
- def test_publish_filtering_str_list(self, sync_wrapper, crypticle, secrets, check_minions):
- opts = self.get_temp_config('master')
+ @patch("salt.utils.minions.CkMinions.check_minions")
+ @patch("salt.master.SMaster.secrets")
+ @patch("salt.crypt.Crypticle")
+ def test_publish_filtering_str_list(self, crypticle, secrets, check_minions):
+ opts = self.get_temp_config("master")
opts["sign_pub_messages"] = False
channel = TCPPubServerChannel(opts)
- wrap = MagicMock()
crypt = MagicMock()
crypt.dumps.return_value = {"test": "value"}
secrets.return_value = {"aes": {"secret": None}}
crypticle.return_value = crypt
- sync_wrapper.return_value = wrap
check_minions.return_value = {"minions": ["minion02"]}
# try simple publish with list tgt_type
- channel.publish({"test": "value", "tgt_type": "list", "tgt": "minion02"})
- payload = wrap.send.call_args[0][0]
+ payload = channel.pack_publish(
+ {"test": "value", "tgt_type": "list", "tgt": "minion02"}
+ )
# verify we send it with correct topic
assert "topic_lst" in payload
diff --git a/tests/unit/transport/test_zeromq.py b/tests/unit/transport/test_zeromq.py
index a68de7c21c..38921465e7 100644
--- a/tests/unit/transport/test_zeromq.py
+++ b/tests/unit/transport/test_zeromq.py
@@ -400,6 +400,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
ctypes.c_char,
six.b(salt.crypt.Crypticle.generate_key_string()),
),
+ "serial": multiprocessing.Value(ctypes.c_longlong, lock=False),
}
cls.minion_config = cls.get_temp_config(
'minion',
@@ -448,10 +449,14 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
sock.connect(pub_uri)
last_msg = time.time()
serial = salt.payload.Serial(opts)
- crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value)
+ crypticle = salt.crypt.Crypticle(
+ opts, salt.master.SMaster.secrets["aes"]["secret"].value
+ )
+ unpacker = salt.utils.msgpack.Unpacker()
+ stop = False
while time.time() - last_msg < timeout:
try:
- payload = sock.recv(zmq.NOBLOCK)
+ wire_bytes = sock.recv(zmq.NOBLOCK)
except zmq.ZMQError:
time.sleep(.01)
else:
@@ -459,13 +464,21 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
if messages != 1:
messages -= 1
continue
- payload = crypticle.loads(serial.loads(payload)['load'])
- if 'stop' in payload:
- break
- last_msg = time.time()
- results.append(payload['jid'])
- @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
+ unpacker.feed(wire_bytes)
+ for w_payload in unpacker:
+ payload = crypticle.loads(w_payload[b"load"])
+ if not payload:
+ continue
+ if "stop" in payload:
+ stop = True
+ break
+ last_msg = time.time()
+ results.append(payload["jid"])
+ if stop:
+ break
+
+ @slowTest
def test_publish_to_pubserv_ipc(self):
'''
Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
@@ -496,6 +509,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
+ @slowTest
def test_zeromq_publish_port(self):
'''
test when connecting that we
@@ -576,13 +590,14 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
Test sending messags to publisher using UDP
with zeromq_filtering enabled
'''
- opts = dict(self.master_config, ipc_mode='ipc',
- pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5)
- server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
- server_channel.pre_fork(self.process_manager, kwargs={
- 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
- })
- pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
+ opts = dict(
+ self.master_config,
+ ipc_mode="ipc",
+ pub_hwm=0,
+ zmq_filtering=True,
+ acceptance_wait_time=5,
+ )
+ pub_uri = "tcp://{interface}:{publish_port}".format(**opts)
send_num = 1
expect = []
results = []
@@ -590,20 +605,47 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
args=(self.minion_config, pub_uri, results,),
kwargs={'messages': 2})
gather.start()
- # Allow time for server channel to start, especially on windows
- time.sleep(2)
- expect.append(send_num)
- load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num}
- with patch('salt.utils.minions.CkMinions.check_minions',
- MagicMock(return_value={'minions': ['minion'], 'missing': [],
- 'ssh_minions': False})):
- server_channel.publish(load)
- server_channel.publish(
- {'tgt_type': 'glob', 'tgt': '*', 'stop': True}
- )
- gather.join()
- server_channel.pub_close()
- assert len(results) == send_num, (len(results), set(expect).difference(results))
+ with patch(
+ "salt.utils.minions.CkMinions.check_minions",
+ MagicMock(
+ return_value={
+ "minions": ["minion"],
+ "missing": [],
+ "ssh_minions": False,
+ }
+ ),
+ ):
+ # Allow time for server channel to start, especially on windows
+ time.sleep(2)
+ server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
+ server_channel.pre_fork(
+ self.process_manager,
+ kwargs={
+ "log_queue": salt.log.setup.get_multiprocessing_logging_queue()
+ },
+ )
+ time.sleep(2)
+ expect.append(send_num)
+ load = {"tgt_type": "glob", "tgt": "*", "jid": send_num}
+ with patch(
+ "salt.utils.minions.CkMinions.check_minions",
+ MagicMock(
+ return_value={
+ "minions": ["minion"],
+ "missing": [],
+ "ssh_minions": False,
+ }
+ ),
+ ):
+ server_channel.publish(load)
+ server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
+ time.sleep(0.3)
+ server_channel.pub_close()
+ gather.join()
+ assert len(results) == send_num, (
+ len(results),
+ set(expect).difference(results),
+ )
def test_publish_to_pubserv_tcp(self):
'''
@@ -636,7 +678,8 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)}
server_channel.publish(load)
- server_channel.close()
+ time.sleep(0.3)
+ server_channel.pub_close()
@staticmethod
def _send_large(opts, sid, num=10, size=250000 * 3):
@@ -644,7 +687,8 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size}
server_channel.publish(load)
- server_channel.close()
+ time.sleep(0.3)
+ server_channel.pub_close()
def test_issue_36469_tcp(self):
'''
@@ -652,19 +696,23 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
https://github.com/saltstack/salt/issues/36469
'''
- opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
- server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
- server_channel.pre_fork(self.process_manager, kwargs={
- 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
- })
+ opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
send_num = 10 * 4
expect = []
results = []
pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
# Allow time for server channel to start, especially on windows
- time.sleep(2)
- gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
+ gather = threading.Thread(
+ target=self._gather_results, args=(self.minion_config, pub_uri, results,)
+ )
gather.start()
+ time.sleep(2)
+ server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
+ server_channel.pre_fork(
+ self.process_manager,
+ kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
+ )
+ time.sleep(2)
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(self._send_small, opts, 1)
executor.submit(self._send_small, opts, 2)
--
2.35.1