File support-paramiko-4.patch of Package python-pyinfra
From 227490a3582c667586cb1930a0ba82900c147c2b Mon Sep 17 00:00:00 2001
From: Whyme Lyu <callme5long@gmail.com>
Date: Mon, 12 Jan 2026 14:47:51 +0800
Subject: [PATCH] dependencies/paramiko: support paramiko v4, remove DSS key
support
These keys are so out of date that this does not warrant a breaking
change version bump.
---
pyproject.toml | 3 +-
src/pyinfra/connectors/ssh_util.py | 5 +-
tests/test_connectors/test_ssh.py | 120 +----------------------------
uv.lock | 4 +-
4 files changed, 8 insertions(+), 124 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index 3e8aab3b9..17aa94a03 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -13,7 +13,7 @@ license-files = ["LICENSE.md"]
requires-python = ">=3.10,<4.0"
dependencies = [
"gevent>=1.5",
- "paramiko>=2.7,<4", # 2.7 (2019) adds OpenSSH key format + Match SSH config
+ "paramiko>=2.7,<5", # 2.7 (2019) adds OpenSSH key format + Match SSH config
"click>2",
"jinja2>3,<4",
"python-dateutil>2,<3",
@@ -22,6 +22,7 @@ dependencies = [
"packaging>=16.1",
"pydantic>=2.11,<3",
"typing-extensions; python_version < '3.11'", # Backport of typing for Unpack (added 3.11)
+ "types-paramiko>=2.7,<5",
]
classifiers = [
"Development Status :: 5 - Production/Stable",
diff --git a/src/pyinfra/connectors/ssh_util.py b/src/pyinfra/connectors/ssh_util.py
index a70685880..5d4bf7997 100644
--- a/src/pyinfra/connectors/ssh_util.py
+++ b/src/pyinfra/connectors/ssh_util.py
@@ -3,7 +3,6 @@
from typing import TYPE_CHECKING, Type, Union
from paramiko import (
- DSSKey,
ECDSAKey,
Ed25519Key,
PasswordRequiredException,
@@ -28,9 +27,9 @@ def raise_connect_error(host: "Host", message, data):
def _load_private_key_file(filename: str, key_filename: str, key_password: str):
exception: Union[PyinfraError, SSHException] = PyinfraError("Invalid key: {0}".format(filename))
- key_cls: Union[Type[RSAKey], Type[DSSKey], Type[ECDSAKey], Type[Ed25519Key]]
+ key_cls: Union[Type[RSAKey], Type[ECDSAKey], Type[Ed25519Key]]
- for key_cls in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
+ for key_cls in (RSAKey, ECDSAKey, Ed25519Key):
try:
return key_cls.from_private_key_file(
filename=filename,
diff --git a/tests/test_connectors/test_ssh.py b/tests/test_connectors/test_ssh.py
index 0aad2c6cb..86391eb63 100644
--- a/tests/test_connectors/test_ssh.py
+++ b/tests/test_connectors/test_ssh.py
@@ -313,10 +313,6 @@ def test_connect_with_rsa_ssh_key_wrong_password(self):
with (
mock.patch("pyinfra.connectors.ssh_util.path.isfile", lambda *args, **kwargs: True),
- mock.patch(
- "pyinfra.connectors.ssh_util.DSSKey.from_private_key_file",
- fake_fail_from_private_key_file,
- ),
mock.patch(
"pyinfra.connectors.ssh_util.ECDSAKey.from_private_key_file",
fake_fail_from_private_key_file,
@@ -345,121 +341,7 @@ def fake_key_open_fail(*args, **kwargs):
assert e.exception.args[0] == "Invalid private key file: testkey"
- assert fake_fail_from_private_key_file.call_count == 3
-
- def test_connect_with_dss_ssh_key(self):
- state = State(make_inventory(hosts=(("somehost", {"ssh_key": "testkey"}),)), Config())
-
- with (
- mock.patch("pyinfra.connectors.ssh_util.path.isfile", lambda *args, **kwargs: True),
- mock.patch(
- "pyinfra.connectors.ssh_util.RSAKey.from_private_key_file",
- ) as fake_rsa_key_open,
- mock.patch(
- "pyinfra.connectors.ssh_util.DSSKey.from_private_key_file",
- ) as fake_key_open,
- ): # noqa
- fake_rsa_key_open.side_effect = make_raise_exception_function(SSHException)
-
- fake_key = mock.MagicMock()
- fake_key_open.return_value = fake_key
-
- connect_all(state)
-
- # Check the key was created properly
- fake_key_open.assert_called_with(filename="testkey")
-
- # And check the Paramiko SSH call was correct
- self.fake_connect_mock.assert_called_with(
- "somehost",
- allow_agent=False,
- look_for_keys=False,
- pkey=fake_key,
- timeout=10,
- username="vagrant",
- _pyinfra_ssh_forward_agent=False,
- _pyinfra_ssh_config_file=None,
- _pyinfra_ssh_known_hosts_file=None,
- _pyinfra_ssh_strict_host_key_checking="accept-new",
- _pyinfra_ssh_paramiko_connect_kwargs=None,
- )
-
- # Check that loading the same key again is cached in the state
- second_state = State(
- make_inventory(hosts=(("somehost", {"ssh_key": "testkey"}),)),
- Config(),
- )
- second_state.private_keys = state.private_keys
-
- connect_all(second_state)
-
- def test_connect_with_dss_ssh_key_password(self):
- state = State(
- make_inventory(
- hosts=(
- (
- "somehost",
- {"ssh_key": "testkey", "ssh_key_password": "testpass"},
- ),
- ),
- ),
- Config(),
- )
-
- with (
- mock.patch("pyinfra.connectors.ssh_util.path.isfile", lambda *args, **kwargs: True),
- mock.patch(
- "pyinfra.connectors.ssh_util.RSAKey.from_private_key_file",
- ) as fake_rsa_key_open,
- mock.patch(
- "pyinfra.connectors.ssh_util.DSSKey.from_private_key_file",
- ) as fake_dss_key_open,
- ): # noqa
-
- def fake_rsa_key_open_fail(*args, **kwargs):
- if "password" not in kwargs:
- raise PasswordRequiredException
- raise SSHException
-
- fake_rsa_key_open.side_effect = fake_rsa_key_open_fail
-
- fake_dss_key = mock.MagicMock()
-
- def fake_dss_key_func(*args, **kwargs):
- if "password" not in kwargs:
- raise PasswordRequiredException
- return fake_dss_key
-
- fake_dss_key_open.side_effect = fake_dss_key_func
-
- connect_all(state)
-
- # Check the key was created properly
- fake_dss_key_open.assert_called_with(filename="testkey", password="testpass")
-
- # And check the Paramiko SSH call was correct
- self.fake_connect_mock.assert_called_with(
- "somehost",
- allow_agent=False,
- look_for_keys=False,
- pkey=fake_dss_key,
- timeout=10,
- username="vagrant",
- _pyinfra_ssh_forward_agent=False,
- _pyinfra_ssh_config_file=None,
- _pyinfra_ssh_known_hosts_file=None,
- _pyinfra_ssh_strict_host_key_checking="accept-new",
- _pyinfra_ssh_paramiko_connect_kwargs=None,
- )
-
- # Check that loading the same key again is cached in the state
- second_state = State(
- make_inventory(hosts=(("somehost", {"ssh_key": "testkey"}),)),
- Config(),
- )
- second_state.private_keys = state.private_keys
-
- connect_all(second_state)
+ assert fake_fail_from_private_key_file.call_count == 2
def test_connect_with_missing_ssh_key(self):
state = State(make_inventory(hosts=(("somehost", {"ssh_key": "testkey"}),)), Config())
diff --git a/uv.lock b/uv.lock
index 81e0372ef..14f8eb9a9 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1262,6 +1262,7 @@ dependencies = [
{ name = "pydantic" },
{ name = "python-dateutil" },
{ name = "typeguard" },
+ { name = "types-paramiko" },
{ name = "typing-extensions", marker = "python_full_version < '3.11'" },
]
@@ -1318,10 +1319,11 @@ requires-dist = [
{ name = "gevent", specifier = ">=1.5" },
{ name = "jinja2", specifier = ">3,<4" },
{ name = "packaging", specifier = ">=16.1" },
- { name = "paramiko", specifier = ">=2.7,<4" },
+ { name = "paramiko", specifier = ">=2.7,<5" },
{ name = "pydantic", specifier = ">=2.11,<3" },
{ name = "python-dateutil", specifier = ">2,<3" },
{ name = "typeguard", specifier = ">=4,<5" },
+ { name = "types-paramiko", specifier = ">=2.7,<5" },
{ name = "typing-extensions", marker = "python_full_version < '3.11'" },
]