File proxmoxer-2.2.0.obscpio of Package python-proxmoxer
07070100000000000081A4000000000000000000000001675E3B1A00000058000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.banditskips:
- B105
- B106
assert_used:
skips:
- '*/*_test.py'
- '*/test_*.py'
07070100000001000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001E00000000proxmoxer-2.2.0/.devcontainer07070100000002000081A4000000000000000000000001675E3B1A0000033B000000000000000000000000000000000000002900000000proxmoxer-2.2.0/.devcontainer/Dockerfile# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.166.1/containers/python-3/.devcontainer/base.Dockerfile
# [Choice] Python version: 3, 3.11, 3.10, 3.9, 3.8, 3.7, 3.6
ARG VARIANT="3"
FROM mcr.microsoft.com/vscode/devcontainers/python:${VARIANT}
# [Optional] If your pip requirements rarely change, uncomment this section to add them to the image.
COPY test_requirements.txt dev_requirements.txt /tmp/pip-tmp/
RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/test_requirements.txt -r /tmp/pip-tmp/dev_requirements.txt \
&& rm -rf /tmp/pip-tmp
# [Optional] Uncomment this section to install additional OS packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here>
07070100000003000081A4000000000000000000000001675E3B1A000007B7000000000000000000000000000000000000003000000000proxmoxer-2.2.0/.devcontainer/devcontainer.json// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Proxmoxer Development",
"build": {
"dockerfile": "Dockerfile",
"context": "..",
"args": {
// Update 'VARIANT' to pick a Python version: 3, 3.6, 3.7, 3.8, 3.9, 3.10, 3.11
"VARIANT": "3.8"
}
},
// Set *default* container specific settings.json values on container create.
"customizations": {
"vscode": {
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint"
},
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"mhutchie.git-graph",
"ms-python.python",
"njpwerner.autodocstring",
"ryanluker.vscode-coverage-gutters",
"streetsidesoftware.code-spell-checker"
]
}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Run commands to prepare the container for use
"postCreateCommand": ".devcontainer/setup.sh",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode"
}
07070100000004000081ED000000000000000000000001675E3B1A00000184000000000000000000000000000000000000002700000000proxmoxer-2.2.0/.devcontainer/setup.sh#!/bin/bash
# install proxmoxer as an editable package
pip3 install -e .
rm -rf proxmoxer.egg-info/
# hide the mass-formatting commits from git blames
git config blame.ignorerevsfile .git-blame-ignore-revs
# install the git hook for pre-commit
pre-commit install
# run pre-commit on a simple file to ensure it downloads all needed tools
pre-commit run --files .pre-commit-config.yaml
07070100000005000081A4000000000000000000000001675E3B1A0000008C000000000000000000000000000000000000002700000000proxmoxer-2.2.0/.git-blame-ignore-revs# use with `git config blame.ignorerevsfile .git-blame-ignore-revs`
# Format code base with Black
7a976de985fc7b71fdf31d3161f223eeaada38da
07070100000006000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.github07070100000007000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002200000000proxmoxer-2.2.0/.github/workflows07070100000008000081A4000000000000000000000001675E3B1A00000786000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/.github/workflows/ci.yamlname: CI
on:
push:
pull_request:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
unit-test:
continue-on-error: ${{ github.repository == 'proxmoxer/proxmoxer' }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache PIP packages
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-python${{ matrix.python-version }}-${{ hashFiles('*requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-python${{ matrix.python-version }}-
${{ runner.os }}-pip-
- name: Install pip Packages
run: pip install -r test_requirements.txt
- name: Install Self as Package
run: pip install .
- name: Run Tests
run: pytest -v --cov tests/
- name: Run pre-commit lint/format checks
uses: pre-commit/action@v3.0.0
- name: Upload coverage data to coveralls.io
if: github.repository == 'proxmoxer/proxmoxer'
run: coveralls --service=github
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_FLAG_NAME: Unit Test (${{ matrix.python-version }})
COVERALLS_PARALLEL: true
complete:
name: Finalize Coveralls Report
if: github.repository == 'proxmoxer/proxmoxer'
needs: unit-test
runs-on: ubuntu-latest
steps:
- name: Coveralls Finished
uses: coverallsapp/github-action@1.1.3
with:
parallel-finished: true
github-token: ${{ secrets.GITHUB_TOKEN }}
07070100000009000081A4000000000000000000000001675E3B1A00000841000000000000000000000000000000000000001B00000000proxmoxer-2.2.0/.gitignore# IDE files
.idea
*.code-workspace
coverage.*
# generated files
README.txt
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
0707010000000A000081A4000000000000000000000001675E3B1A0000060B000000000000000000000000000000000000002800000000proxmoxer-2.2.0/.pre-commit-config.yamlrepos:
###### FORMATTING ######
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.11.0
hooks:
- id: black
language_version: python3 # Should be a command that runs python3.6+
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (pyi)
types: [pyi]
###### LINTING ######
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args: ["--configfile", ".bandit", "--baseline", "tests/known_issues.json"]
- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
hooks:
- id: flake8
# any flake8 plugins must be included in the hook venv
# additional_dependencies: [flake8-docstrings]
# - repo: https://github.com/PyCQA/pylint
# rev: v2.8.2
# hooks:
# - id: pylint
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: check-case-conflict
- id: check-symlinks
- id: destroyed-symlinks
- id: check-merge-conflict
- id: check-docstring-first
- id: mixed-line-ending
args: [--fix=no]
- repo: https://github.com/asottile/blacken-docs
rev: 1.16.0
hooks:
- id: blacken-docs
additional_dependencies: [black==23.11.0]
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
- id: python-no-eval
- id: rst-backticks
- id: rst-directive-colons
- id: rst-inline-touching-normal
0707010000000B000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.vscode0707010000000C000081A4000000000000000000000001675E3B1A00000419000000000000000000000000000000000000002600000000proxmoxer-2.2.0/.vscode/settings.json{
"python.linting.enabled": true,
"python.formatting.provider": "black",
"editor.formatOnPaste": false,
"python.linting.flake8Enabled": true,
"python.linting.pylintEnabled": true,
"python.linting.banditEnabled": true,
"python.linting.banditArgs": [
"--baseline",
"tests/known_issues.json",
"--configfile",
".bandit"
],
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
}
},
"cSpell.words": [
"auths",
"Butovich",
"caplog",
"cpus",
"Oleg",
"onboot",
"ostemplate",
"Paramiko",
"proxmoxer",
"pvesh",
"resps",
"rtype",
"sess",
"toolbelt",
"vmid",
"vztmpl"
],
"autoDocstring.docstringFormat": "sphinx",
"autoDocstring.startOnNewLine": true,
"python.testing.pytestArgs": [
"--cov",
"--cov-report",
"xml:coverage.xml",
"tests/",
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"coverage-gutters.coverageFileNames": [
"coverage.xml"
],
}
0707010000000D000081A4000000000000000000000001675E3B1A00000AEA000000000000000000000000000000000000002300000000proxmoxer-2.2.0/.vscode/tasks.json{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Run Tests (with coverage file)",
"type": "shell",
"command": "pytest -v --cov --cov-report xml:coverage.xml tests/",
"problemMatcher": [],
"icon": {
"id": "beaker",
"color": "terminal.ansiGreen"
},
"runOptions": {
"instanceLimit": 1
},
"group": {
"kind": "test",
"isDefault": true
},
"presentation": {
"echo": true,
"reveal": "silent",
"focus": false,
"panel": "dedicated",
"showReuseMessage": false,
"clear": true
},
},
{
"label": "Run Tests (with coverage)",
"type": "shell",
"command": "pytest --cov tests/",
"problemMatcher": [],
"icon": {
"id": "beaker",
"color": "terminal.ansiGreen"
},
"runOptions": {
"instanceLimit": 1
},
"group": {
"kind": "test",
"isDefault": false
},
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "dedicated",
"showReuseMessage": true,
"clear": true
},
},
{
"label": "Run Tests",
"type": "shell",
"command": "pytest -v tests/",
"problemMatcher": [],
"icon": {
"id": "beaker",
},
"group": {
"kind": "test"
},
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "dedicated",
"showReuseMessage": false,
"clear": true
},
},
{
"label": "Update bandit baseline",
"type": "shell",
"command": "bandit --configfile .bandit -f json -r tests/ proxmoxer/ >| tests/known_issues.json",
"problemMatcher": [],
"runOptions": {
"instanceLimit": 1
},
"group": {
"kind": "none"
},
"icon": {
"id": "bookmark"
},
"presentation": {
"echo": true,
"reveal": "never",
"focus": false,
"panel": "shared",
"showReuseMessage": false,
"clear": false
},
},
{
"label": "Clean Cache/tmp files",
"type": "shell",
"command": "rm -rf ./.mypy_cache/ ./.pytest_cache/ ./dist/ ./coverage.xml ./.coverage README.txt",
"problemMatcher": [],
"group": {
"kind": "none"
},
"icon": {
"id": "trashcan"
},
"presentation": {
"echo": true,
"reveal": "never",
"focus": false,
"panel": "shared",
"showReuseMessage": false,
"clear": false
},
}
]
}
0707010000000E000081A4000000000000000000000001675E3B1A0000240B000000000000000000000000000000000000001D00000000proxmoxer-2.2.0/CHANGELOG.md## 2.2.0 (2024-12-13)
* Bugfix (local,openssh,paramiko): Remove IP/hostname from command path ([Andrea Dainese](https://github.com/dainok), [John Hollowell](https://github.com/jhollowe))
* Addition (https): Allow passing certificate for TLS verification ([gdelaneync](https://github.com/gdelaneync))
* Bugfix (local,openssh,paramiko): Prevent a returned task ID (UPID) from throwing an error ([Adam Dorsey](https://github.com/asdorsey), [John Hollowell](https://github.com/jhollowe))
* Improvement (local,openssh,paramiko): Attempt to encode binary payloads as UTF-8 before sending/JSON-ing ([Adam Dorsey](https://github.com/asdorsey))
## 2.1.0 (2024-08-10)
* Improvement (docs): Update Readme with updated example ([Rob Wolinski](https://github.com/trekie86))
* Addition (tools): Added Files tools ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Add repr to some classes and add to tests ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Correct metadata to match supported Python versions (3.6+) ([Alexei Znamensky](https://github.com/russoz))
* Bugfix (https): Fix BytesWarning when logging response status/content ([Walter Doekes](https://github.com/wdoekes))
* Improvement (meta): Update devcontainer to modern unified schema ([John Hollowell](https://github.com/jhollowe))
* Improvement (meta): Add 3.12 to CI matrix, remove 3.7 testing ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Fix improper splitting of non-exec QEMU commands ([John Hollowell](https://github.com/jhollowe))
## 2.0.1 (2022-12-19)
* Bugfix (https): properly pass verify_ssl all the way to the backend auth ([Dominik Rimpf](https://github.com/domrim))
## 2.0.0 (2022-11-27)
* Improvement (all): Convert testing framework to use pytest ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Remove Python 2.x support (minimum version of 3.7) ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Refactor code to Python 3 standards ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Remove None values from request data and params ([Kristian Heljas](https://github.com/kristianheljas))
* Addition (tools): Added Task tools ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Allow specifying resource_id as 0 ([John Bergvall](https://github.com/johnbergvall))
* Improvement (all): Remove ProxmoxResourceBase ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Add platform detection before using shlex functions ([Kevin Boyd](https://github.com/r3d07))
* Improvement (https): Added `path_prefix` argument which is appended after the root of the URL (before `api2/`) ([John Hollowell](https://github.com/jhollowe))
### Breaking Changes
* `ProxmoxResourceBase` removed
* `proxmoxer.backends.https.AuthenticationError` moved to `proxmoxer.AuthenticationError`
* Removed `ProxmoxHTTPTicketAuth` and its arguments `auth_token` and `csrf_token`
* keyword arguments to backends order changed (should not affect users specifying arguments by name)
## 1.3.1 (2022-05-14)
* Bugfix (all): fix error handling for APIs that don't give a dict in the response ([Alex Wuillaume](https://github.com/wuillaumea))
## 1.3.0 (2022-03-13)
* Addition (local): Added `local` backend for running directly on Proxmox hosts. ([Markus Reiter](https://github.com/reitermarkus))
* Bugfix (all): properly parse command string sent to QEMU guest agent ([John Hollowell](https://github.com/jhollowe))
* Improvement (command_base): Refactor code to have a unified CLI backend base for `openssh`, `ssh_paramiko`, and `local` backends ([Markus Reiter](https://github.com/reitermarkus))
* Improvement (https): Support IPv6 addresses ([Daviddcc](https://github.com/dcasier))
* Improvement: Move CI to GitHub actions from Travis.ci ([John Hollowell](https://github.com/jhollowe))
* Improvement: Cleanup documentation and move to dedicated site ([John Hollowell](https://github.com/jhollowe))
* Improvement: Add `pre-commit` hooks for formatting and linting and format all code ([John Hollowell](https://github.com/jhollowe))
## 1.2.0 (2021-10-07)
* Addition (https): Added OTP code support to authentication ([John Hollowell](https://github.com/jhollowe))
* Addition (https): Added support for large file uploads using requests_toolbelt module ([John Hollowell](https://github.com/jhollowe))
* Addition (all): Added support for Proxmox Mail Gateway (PMG) and Proxmox Backup Server (PBS) with parameter validation ([Gabriel Cardoso de Faria](https://github.com/gabrielcardoso21), [John Hollowell](https://github.com/jhollowe))
* Addition (all): Added detailed information to ResourceException ([mihailstoynov](https://github.com/mihailstoynov))
* Bugfix (base_ssh): Resolved issue with values containing spaces by encapsulating values in quotes ([mihailstoynov](https://github.com/mihailstoynov))
* Bugfix (all): Resolved issue with using get/post/push/delete on a base ProxmoxAPI object ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Added support for responses which are not JSON ([John Hollowell](https://github.com/jhollowe))
* Improvement: Added and updated documentation ([Ananias Filho](https://github.com/ananiasfilho), [Thomas Baag](https://github.com/b2ag))
* Improvement: Tests are now not installed when using PIP ([Ville Skyttä](https://github.com/scop))
* Addition: Devcontainer definition now available to make development easier ([John Hollowell](https://github.com/jhollowe))
## 1.1.1 (2020-06-23)
* Bugfix (https): correctly renew ticket in the session, not just the auth ([John Hollowell](https://github.com/jhollowe))
## 1.1.0 (2020-05-22)
* Addition (https): Added API Token authentication ([John Hollowell](https://github.com/jhollowe))
* Improvement (https): user/password authentication refreshes ticket to prevent expiration ([CompileNix](https://github.com/compilenix), [John Hollowell](https://github.com/jhollowe))
* Bugfix (ssh_paramiko): Handle empty stderr from ssh connections ([morph027](https://github.com/morph027))
* DEPRECATED (https): using ``auth_token`` and ``csrf_token`` (ProxmoxHTTPTicketAuth) is now deprecated. Either pass the ``auth_token`` as the ``password`` or use the API Tokens.
## 1.0.4 (2020-01-24)
* Improvement (https): Added timeout to authentication (James Lin)
* Improvement (https): Handle AnyEvent::HTTP status codes gracefully (Georges Martin)
* Improvement (https): Advanced error message with error code >=400 ([ssi444](https://github.com/ssi444))
* Bugfix (ssh): Fix pvesh output format for version > 5.3 ([timansky](https://github.com/timansky))
* Transferred development to proxmoxer organization
## 1.0.3 (2018-09-10)
* Improvement (https): Added option to specify port in hostname parameter ([pvanagtmaal](https://github.com/pvanagtmaal))
* Improvement: Added stderr to the Response content ([Jérôme Schneider](https://github.com/merinos))
* Bugfix (ssh_paramiko): Paramiko python3: stdout and stderr must be a str not bytes ([Jérôme Schneider](https://github.com/merinos))
* New lxc example in documentation ([Geert Stappers](https://github.com/stappersg))
## 1.0.2 (2017-12-02)
* Tarball repackaged with tests
## 1.0.1 (2017-12-02)
* LICENSE file now included in tarball
* Added verify_ssl parameter to ProxmoxHTTPAuth ([Walter Doekes](https://github.com/wdoekes))
## 1.0.0 (2017-11-12)
* Update Proxmoxer readme ([Emmanuel Kasper](https://github.com/EmmanuelKasper))
* Display the reason of API calls errors ([Emmanuel Kasper](https://github.com/EmmanuelKasper), [kantsdog](https://github.com/kantsdog))
* Filter for ssh response code ([Chris Plock](https://github.com/chrisplo))
## 0.2.5 (2017-02-12)
* Adding sudo to execute CLI with paramiko ssh backend ([Jason Meridth](https://github.com/jmeridth))
* Proxmoxer/backends/ssh_paramiko: improve file upload ([Jérôme Schneider](https://github.com/merinos))
## 0.2.4 (2016-05-02)
* Removed newline in tmp_filename string ([Jérôme Schneider](https://github.com/merinos))
* Fix to avoid module reloading ([jklang](https://github.com/jklang))
## 0.2.3 (2016-01-20)
* Minor typo fix ([Srinivas Sakhamuri](https://github.com/srsakhamuri))
## 0.2.2 (2016-01-19)
* Adding sudo to execute pvesh CLI in openssh backend ([Wei Tie](https://github.com/TieWei), [Srinivas Sakhamuri](https://github.com/srsakhamuri))
* Add support to specify an identity file for ssh connections ([Srinivas Sakhamuri](https://github.com/srsakhamuri))
## 0.2.1 (2015-05-02)
* fix for python 3.4 ([kokuev](https://github.com/kokuev))
## 0.2.0 (2015-03-21)
* Https will now raise AuthenticationError when appropriate. ([scap1784](https://github.com/scap1784))
* Preliminary python 3 compatibility. ([wdoekes](https://github.com/wdoekes))
* Additional example. ([wdoekes](https://github.com/wdoekes))
## 0.1.7 (2014-11-16)
* Added ignore of "InsecureRequestWarning: Unverified HTTPS request is being made..." warning while using https (requests) backend.
## 0.1.4 (2013-06-01)
* Added logging
* Added openssh backend
* Tests are reorganized
## 0.1.3 (2013-05-30)
* Added next tests
* Bugfixes
## 0.1.2 (2013-05-27)
* Added first tests
* Added support for travis and coveralls
* Bugfixes
## 0.1.1 (2013-05-13)
* Initial try.
0707010000000F000081A4000000000000000000000001675E3B1A00000431000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/LICENSE.txtThe MIT License
Copyright (c) 2013 Oleg Butovich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.07070100000010000081A4000000000000000000000001675E3B1A00000077000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/MANIFEST.ininclude LICENSE.txt
include README.txt
include README.rst
include CHANGELOG.md
global-exclude *.orig *.pyc *.log *.swp
07070100000011000081A4000000000000000000000001675E3B1A00001004000000000000000000000000000000000000001B00000000proxmoxer-2.2.0/README.rst================================================
Proxmoxer: A Python wrapper for Proxmox REST API
================================================
master branch: |master_build_status| |master_coverage_status| |pypi_version| |pypi_downloads|
develop branch: |develop_build_status| |develop_coverage_status|
Proxmoxer is a python wrapper around the `Proxmox REST API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_.
It currently supports the Proxmox services of Proxmox Virtual Environment (PVE), Proxmox Mail Gateway (PMG), and Proxmox Backup Server (PBS).
It was inspired by slumber, but it is dedicated only to Proxmox. It allows not only REST API use over HTTPS, but
the same api over ssh and pvesh utility.
Like `Proxmoxia <https://github.com/baseblack/Proxmoxia>`_, it dynamically creates attributes which responds to the
attributes you've attempted to reach.
Full Documentation is available at https://proxmoxer.github.io/docs/
--------------------------------------------------------------------
Migrating to version 2
......................
Full instructions for the minimal steps needed to update to version 2 can be found in `Migration Docs <https://proxmoxer.github.io/docs/latest/v1_migration/>`_.
Installation
............
.. code-block:: bash
pip install proxmoxer
To use the 'https' backend, install requests
.. code-block:: bash
pip install requests
To use the 'ssh_paramiko' backend, install paramiko
.. code-block:: bash
pip install paramiko
To use the 'openssh' backend, install openssh_wrapper
.. code-block:: bash
pip install openssh_wrapper
Short usage information
.......................
The first thing to do is import the proxmoxer library and create ProxmoxAPI instance.
.. code-block:: python
from proxmoxer import ProxmoxAPI
proxmox = ProxmoxAPI(
"proxmox_host", user="admin@pam", password="secret_word", verify_ssl=False
)
This will connect by default to PVE through the 'https' backend.
**Note: ensure you have the required libraries (listed above) for the connection method you are using**
Queries are exposed via the access methods **get**, **post**, **put** and **delete**. For convenience two
synonyms are available: **create** for **post**, and **set** for **put**.
Using the paths from the `PVE API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_, you can create
API calls using the access methods above.
.. code-block:: pycon
>>> for node in proxmox.nodes.get():
... for vm in proxmox.nodes(node["node"]).qemu.get():
... print(f"{vm['vmid']}. {vm['name']} => {vm['status']}")
...
141. puppet-2.london.example.com => running
101. munki.london.example.com => running
102. redmine.london.example.com => running
140. dns-1.london.example.com => running
126. ns-3.london.example.com => running
113. rabbitmq.london.example.com => running
See Changelog in `CHANGELOG.md <https://github.com/proxmoxer/proxmoxer/blob/develop/CHANGELOG.md>`_
...................................................................................................
.. |master_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=master
:target: https://github.com/proxmoxer/proxmoxer/actions
.. |master_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/master
:target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=master
.. |develop_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=develop
:target: https://github.com/proxmoxer/proxmoxer/actions
.. |develop_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/develop
:target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=develop
.. |pypi_version| image:: https://img.shields.io/pypi/v/proxmoxer.svg
:target: https://pypi.python.org/pypi/proxmoxer
.. |pypi_downloads| image:: https://img.shields.io/pypi/dm/proxmoxer.svg
:target: https://pypi.python.org/pypi/proxmoxer
07070100000012000081A4000000000000000000000001675E3B1A0000001C000000000000000000000000000000000000002500000000proxmoxer-2.2.0/dev_requirements.txttwine
setuptools
pre-commit
07070100000013000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001A00000000proxmoxer-2.2.0/proxmoxer07070100000014000081A4000000000000000000000001675E3B1A00000092000000000000000000000000000000000000002600000000proxmoxer-2.2.0/proxmoxer/__init__.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2024"
__version__ = "2.2.0"
__license__ = "MIT"
from .core import * # noqa
07070100000015000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002300000000proxmoxer-2.2.0/proxmoxer/backends07070100000016000081A4000000000000000000000001675E3B1A0000005F000000000000000000000000000000000000002F00000000proxmoxer-2.2.0/proxmoxer/backends/__init__.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
07070100000017000081A4000000000000000000000001675E3B1A000014DB000000000000000000000000000000000000003300000000proxmoxer-2.2.0/proxmoxer/backends/command_base.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import json
import logging
import platform
import re
from itertools import chain
from shlex import split as shell_split
from proxmoxer.core import SERVICES
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
from shlex import join
def shell_join(args):
return join(args)
except ImportError:
from shlex import quote
def shell_join(args):
return " ".join([quote(arg) for arg in args])
class Response:
def __init__(self, content, status_code):
self.status_code = status_code
self.content = content
self.text = str(content)
self.headers = {"content-type": "application/json"}
def __str__(self):
return f"Response ({self.status_code}) {self.content}"
class CommandBaseSession:
def __init__(
self,
service="PVE",
timeout=5,
sudo=False,
):
self.service = service.lower()
self.timeout = timeout
self.sudo = sudo
def _exec(self, cmd):
raise NotImplementedError()
# noinspection PyUnusedLocal
def request(self, method, url, data=None, params=None, headers=None):
method = method.lower()
data = data or {}
params = params or {}
url = url.strip()
cmd = {"post": "create", "put": "set"}.get(method, method)
# separate out qemu exec commands to split into multiple argument pairs (issue#89)
data_command = None
if "/agent/exec" in url:
data_command = data.get("command")
if data_command is not None:
del data["command"]
# for 'upload' call some workaround
tmp_filename = ""
if url.endswith("upload"):
# copy file to temporary location on proxmox host
tmp_filename, _ = self._exec(
[
"python3",
"-c",
"import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
]
)
tmp_filename = str(tmp_filename, "utf-8")
self.upload_file_obj(data["filename"], tmp_filename)
data["filename"] = data["filename"].name
data["tmpfilename"] = tmp_filename
command = [f"{self.service}sh", cmd, url]
# convert the options dict into a 2-tuple with the key formatted as a flag
option_pairs = []
for k, v in chain(data.items(), params.items()):
try:
option_pairs.append((f"-{k}", str(v, "utf-8")))
except TypeError:
option_pairs.append((f"-{k}", str(v)))
# add back in all the command arguments as their own pairs
if data_command is not None:
if isinstance(data_command, list):
command_arr = data_command
elif "Windows" not in platform.platform():
command_arr = shell_split(data_command)
for arg in command_arr:
option_pairs.append(("-command", arg))
# expand the list of 2-tuples into a flat list
options = [val for pair in option_pairs for val in pair]
additional_options = SERVICES[self.service.upper()].get("cli_additional_options", [])
full_cmd = command + options + additional_options
if self.sudo:
full_cmd = ["sudo"] + full_cmd
stdout, stderr = self._exec(full_cmd)
def is_http_status_string(s):
return re.match(r"\d\d\d [a-zA-Z]", str(s))
if stderr:
# assume if we got a task ID that the request was successful
task_id_pattern = re.compile(
r"UPID:[\w-]+:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:\w+:[\w\._-]+:[\w\.@_-]+:\w*"
)
if task_id_pattern.search(str(stdout)) or task_id_pattern.search(str(stderr)):
status_code = 200
else:
# sometimes contains extra text like 'trying to acquire lock...OK'
status_code = next(
(
int(line.split()[0])
for line in stderr.splitlines()
if is_http_status_string(line)
),
500,
)
else:
status_code = 200
if stdout:
return Response(stdout, status_code)
return Response(stderr, status_code)
def upload_file_obj(self, file_obj, remote_path):
raise NotImplementedError()
class JsonSimpleSerializer:
def loads(self, response):
try:
return json.loads(response.content)
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
def loads_errors(self, response):
try:
return json.loads(response.text).get("errors")
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
class CommandBaseBackend:
def __init__(self):
self.session = None
self.target = None
def get_session(self):
return self.session
def get_base_url(self):
return ""
def get_serializer(self):
return JsonSimpleSerializer()
07070100000018000081A4000000000000000000000001675E3B1A00002EEB000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/backends/https.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import io
import json
import logging
import os
import platform
import sys
import time
from shlex import split as shell_split
from proxmoxer.core import SERVICES, AuthenticationError, config_failure
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
STREAMING_SIZE_THRESHOLD = 10 * 1024 * 1024 # 10 MiB
SSL_OVERFLOW_THRESHOLD = 2147483135 # 2^31 - 1 - 512
try:
import requests
from requests.auth import AuthBase
from requests.cookies import cookiejar_from_dict
# Disable warnings about using untrusted TLS
requests.packages.urllib3.disable_warnings()
except ImportError:
logger.error("Chosen backend requires 'requests' module\n")
sys.exit(1)
class ProxmoxHTTPAuthBase(AuthBase):
def __call__(self, req):
return req
def get_cookies(self):
return cookiejar_from_dict({})
def get_tokens(self):
return None, None
def __init__(self, timeout=5, service="PVE", verify_ssl=False, cert=None):
self.timeout = timeout
self.service = service
self.verify_ssl = verify_ssl
self.cert = cert
class ProxmoxHTTPAuth(ProxmoxHTTPAuthBase):
# number of seconds between renewing access tickets (must be less than 7200 to function correctly)
# if calls are made less frequently than 2 hrs, using the API token auth is recommended
renew_age = 3600
def __init__(self, username, password, otp=None, base_url="", **kwargs):
super().__init__(**kwargs)
self.base_url = base_url
self.username = username
self.pve_auth_ticket = ""
self._get_new_tokens(password=password, otp=otp)
def _get_new_tokens(self, password=None, otp=None):
if password is None:
# refresh from existing (unexpired) ticket
password = self.pve_auth_ticket
data = {"username": self.username, "password": password}
if otp:
data["otp"] = otp
response_data = requests.post(
self.base_url + "/access/ticket",
verify=self.verify_ssl,
timeout=self.timeout,
data=data,
cert=self.cert,
).json()["data"]
if response_data is None:
raise AuthenticationError(
"Couldn't authenticate user: {0} to {1}".format(
self.username, self.base_url + "/access/ticket"
)
)
if response_data.get("NeedTFA") is not None:
raise AuthenticationError(
"Couldn't authenticate user: missing Two Factor Authentication (TFA)"
)
self.birth_time = time.monotonic()
self.pve_auth_ticket = response_data["ticket"]
self.csrf_prevention_token = response_data["CSRFPreventionToken"]
def get_cookies(self):
return cookiejar_from_dict({self.service + "AuthCookie": self.pve_auth_ticket})
def get_tokens(self):
return self.pve_auth_ticket, self.csrf_prevention_token
def __call__(self, req):
# refresh ticket if older than `renew_age`
time_diff = time.monotonic() - self.birth_time
if time_diff >= self.renew_age:
logger.debug(f"refreshing ticket (age {time_diff})")
self._get_new_tokens()
# only attach CSRF token if needed (reduce interception risk)
if req.method != "GET":
req.headers["CSRFPreventionToken"] = self.csrf_prevention_token
return req
class ProxmoxHTTPApiTokenAuth(ProxmoxHTTPAuthBase):
def __init__(self, username, token_name, token_value, **kwargs):
super().__init__(**kwargs)
self.username = username
self.token_name = token_name
self.token_value = token_value
def __call__(self, req):
req.headers["Authorization"] = "{0}APIToken={1}!{2}{3}{4}".format(
self.service,
self.username,
self.token_name,
SERVICES[self.service]["token_separator"],
self.token_value,
)
req.cert = self.cert
return req
class JsonSerializer:
content_types = [
"application/json",
"application/x-javascript",
"text/javascript",
"text/x-javascript",
"text/x-json",
]
def get_accept_types(self):
return ", ".join(self.content_types)
def loads(self, response):
try:
return json.loads(response.content.decode("utf-8"))["data"]
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
def loads_errors(self, response):
try:
return json.loads(response.text).get("errors")
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
# pylint:disable=arguments-renamed
class ProxmoxHttpSession(requests.Session):
def request(
self,
method,
url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
serializer=None,
):
a = auth or self.auth
c = cookies or self.cookies
# set verify flag from auth if request does not have this parameter explicitly
if verify is None:
verify = a.verify_ssl
if timeout is None:
timeout = a.timeout
# pull cookies from auth if not present
if (not c) and a:
cookies = a.get_cookies()
# filter out streams
files = files or {}
data = data or {}
total_file_size = 0
for k, v in data.copy().items():
# split qemu exec commands for proper parsing by PVE (issue#89)
if k == "command" and url.endswith("agent/exec"):
if isinstance(v, list):
data[k] = v
elif "Windows" not in platform.platform():
data[k] = shell_split(v)
if isinstance(v, io.IOBase):
total_file_size += get_file_size(v)
# add in filename from file pointer (patch for https://github.com/requests/toolbelt/pull/316)
# add Content-Type since Proxmox requires it (https://bugzilla.proxmox.com/show_bug.cgi?id=4344)
files[k] = (requests.utils.guess_filename(v), v, "application/octet-stream")
del data[k]
# if there are any large files, send all data and files using streaming multipart encoding
if total_file_size > STREAMING_SIZE_THRESHOLD:
try:
# pylint:disable=import-outside-toplevel
from requests_toolbelt import MultipartEncoder
encoder = MultipartEncoder(fields={**data, **files})
data = encoder
files = None
headers = {"Content-Type": encoder.content_type}
except ImportError:
# if the files will cause issues with the SSL 2GiB limit (https://bugs.python.org/issue42853#msg384566)
if total_file_size > SSL_OVERFLOW_THRESHOLD:
logger.warning(
"Install 'requests_toolbelt' to add support for files larger than 2GiB"
)
raise OverflowError("Unable to upload a payload larger than 2 GiB")
else:
logger.info(
"Installing 'requests_toolbelt' will decrease memory used during upload"
)
return super().request(
method,
url,
params,
data,
headers,
cookies,
files,
auth,
timeout,
allow_redirects,
proxies,
hooks,
stream,
verify,
cert,
)
class Backend:
def __init__(
self,
host,
user=None,
password=None,
otp=None,
port=None,
verify_ssl=True,
mode="json",
timeout=5,
token_name=None,
token_value=None,
path_prefix=None,
service="PVE",
cert=None,
):
self.cert = cert
host_port = ""
if len(host.split(":")) > 2: # IPv6
if host.startswith("["):
if "]:" in host:
host, host_port = host.rsplit(":", 1)
else:
host = f"[{host}]"
elif ":" in host:
host, host_port = host.split(":")
port = host_port if host_port.isdigit() else port
# if a port is not specified, use the default port for this service
if not port:
port = SERVICES[service]["default_port"]
self.mode = mode
if path_prefix is not None:
self.base_url = f"https://{host}:{port}/{path_prefix}/api2/{mode}"
else:
self.base_url = f"https://{host}:{port}/api2/{mode}"
if token_name is not None:
if "token" not in SERVICES[service]["supported_https_auths"]:
config_failure("{} does not support API Token authentication", service)
self.auth = ProxmoxHTTPApiTokenAuth(
user,
token_name,
token_value,
verify_ssl=verify_ssl,
timeout=timeout,
service=service,
cert=self.cert,
)
elif password is not None:
if "password" not in SERVICES[service]["supported_https_auths"]:
config_failure("{} does not support password authentication", service)
self.auth = ProxmoxHTTPAuth(
user,
password,
otp,
base_url=self.base_url,
verify_ssl=verify_ssl,
timeout=timeout,
service=service,
cert=self.cert,
)
else:
config_failure("No valid authentication credentials were supplied")
def get_session(self):
session = ProxmoxHttpSession()
session.cert = self.cert
session.auth = self.auth
# cookies are taken from the auth
session.headers["Connection"] = "keep-alive"
session.headers["accept"] = self.get_serializer().get_accept_types()
return session
def get_base_url(self):
return self.base_url
def get_serializer(self):
assert self.mode == "json"
return JsonSerializer()
def get_tokens(self):
"""Return the in-use auth and csrf tokens if using user/password auth."""
return self.auth.get_tokens()
def get_file_size(file_obj):
"""Returns the number of bytes in the given file object in total
file cursor remains at the same location as when passed in
:param fileObj: file object of which the get size
:type fileObj: file object
:return: total bytes in file object
:rtype: int
"""
# store existing file cursor location
starting_cursor = file_obj.tell()
# seek to end of file
file_obj.seek(0, os.SEEK_END)
size = file_obj.tell()
# reset cursor
file_obj.seek(starting_cursor)
return size
def get_file_size_partial(file_obj):
"""Returns the number of bytes in the given file object from the current cursor to the end
:param fileObj: file object of which the get size
:type fileObj: file object
:return: remaining bytes in file object
:rtype: int
"""
# store existing file cursor location
starting_cursor = file_obj.tell()
file_obj.seek(0, os.SEEK_END)
# get number of byte between where the cursor was set and the end
size = file_obj.tell() - starting_cursor
# reset cursor
file_obj.seek(starting_cursor)
return size
07070100000019000081A4000000000000000000000001675E3B1A00000307000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/backends/local.py__author__ = "Markus Reiter"
__copyright__ = "(c) Markus Reiter 2022"
__license__ = "MIT"
import shutil
from subprocess import PIPE, Popen
from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession
class LocalSession(CommandBaseSession):
def _exec(self, cmd):
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate(timeout=self.timeout)
return stdout.decode(), stderr.decode()
def upload_file_obj(self, file_obj, remote_path):
with open(remote_path, "wb") as dest_fp:
shutil.copyfileobj(file_obj, dest_fp)
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = LocalSession(*args, **kwargs)
self.target = "localhost"
0707010000001A000081A4000000000000000000000001675E3B1A000006DD000000000000000000000000000000000000002E00000000proxmoxer-2.2.0/proxmoxer/backends/openssh.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import logging
from proxmoxer.backends.command_base import (
CommandBaseBackend,
CommandBaseSession,
shell_join,
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import openssh_wrapper
except ImportError:
import sys
logger.error("Chosen backend requires 'openssh_wrapper' module\n")
sys.exit(1)
class OpenSSHSession(CommandBaseSession):
def __init__(
self,
host,
user,
config_file=None,
port=22,
identity_file=None,
forward_ssh_agent=False,
**kwargs,
):
super().__init__(**kwargs)
self.host = host
self.user = user
self.config_file = config_file
self.port = port
self.forward_ssh_agent = forward_ssh_agent
self.identity_file = identity_file
self.ssh_client = self._connect()
def _connect(self):
return openssh_wrapper.SSHConnection(
self.host,
login=self.user,
port=str(self.port), # openssh_wrapper complains if this is an int
configfile=self.config_file,
identity_file=self.identity_file,
timeout=self.timeout,
)
def _exec(self, cmd):
ret = self.ssh_client.run(shell_join(cmd), forward_ssh_agent=self.forward_ssh_agent)
return ret.stdout, ret.stderr
def upload_file_obj(self, file_obj, remote_path):
self.ssh_client.scp((file_obj,), target=remote_path)
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = OpenSSHSession(*args, **kwargs)
self.target = self.session.host
0707010000001B000081A4000000000000000000000001675E3B1A00000845000000000000000000000000000000000000003300000000proxmoxer-2.2.0/proxmoxer/backends/ssh_paramiko.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
# spell-checker:ignore putfo
import logging
import os
from proxmoxer.backends.command_base import (
CommandBaseBackend,
CommandBaseSession,
shell_join,
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import paramiko
except ImportError:
import sys
logger.error("Chosen backend requires 'paramiko' module\n")
sys.exit(1)
class SshParamikoSession(CommandBaseSession):
def __init__(self, host, user, password=None, private_key_file=None, port=22, **kwargs):
super().__init__(**kwargs)
self.host = host
self.user = user
self.password = password
self.private_key_file = private_key_file
self.port = port
self.ssh_client = self._connect()
def _connect(self):
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.private_key_file:
key_filename = os.path.expanduser(self.private_key_file)
else:
key_filename = None
ssh_client.connect(
self.host,
username=self.user,
allow_agent=(not self.password),
look_for_keys=True,
key_filename=key_filename,
password=self.password,
timeout=self.timeout,
port=self.port,
)
return ssh_client
def _exec(self, cmd):
session = self.ssh_client.get_transport().open_session()
session.exec_command(shell_join(cmd))
stdout = session.makefile("rb", -1).read().decode()
stderr = session.makefile_stderr("rb", -1).read().decode()
return stdout, stderr
def upload_file_obj(self, file_obj, remote_path):
sftp = self.ssh_client.open_sftp()
sftp.putfo(file_obj, remote_path)
sftp.close()
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = SshParamikoSession(*args, **kwargs)
self.target = self.session.host
0707010000001C000081A4000000000000000000000001675E3B1A00001EC6000000000000000000000000000000000000002200000000proxmoxer-2.2.0/proxmoxer/core.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
# spell-checker:ignore urlunsplit
import importlib
import logging
import posixpath
from http import client as httplib
from urllib import parse as urlparse
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
# https://metacpan.org/pod/AnyEvent::HTTP
ANYEVENT_HTTP_STATUS_CODES = {
595: "Errors during connection establishment, proxy handshake",
596: "Errors during TLS negotiation, request sending and header processing",
597: "Errors during body receiving or processing",
598: "User aborted request via on_header or on_body",
599: "Other, usually nonretryable, errors (garbled URL etc.)",
}
SERVICES = {
"PVE": {
"supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
"supported_https_auths": ["password", "token"],
"default_port": 8006,
"token_separator": "=",
"cli_additional_options": ["--output-format", "json"],
},
"PMG": {
"supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
"supported_https_auths": ["password"],
"default_port": 8006,
},
"PBS": {
"supported_backends": ["https"],
"supported_https_auths": ["password", "token"],
"default_port": 8007,
"token_separator": ":",
},
}
def config_failure(message, *args):
raise NotImplementedError(message.format(*args))
class ResourceException(Exception):
"""
An Exception thrown when an Proxmox API call failed
"""
def __init__(self, status_code, status_message, content, errors=None):
"""
Create a new ResourceException
:param status_code: The HTTP status code (faked by non-HTTP backends)
:type status_code: int
:param status_message: HTTP Status code (faked by non-HTTP backends)
:type status_message: str
:param content: Extended information on what went wrong
:type content: str
:param errors: Any specific errors that were encountered (converted to string), defaults to None
:type errors: Optional[object], optional
"""
self.status_code = status_code
self.status_message = status_message
self.content = content
self.errors = errors
if errors is not None:
content += f" - {errors}"
message = f"{status_code} {status_message}: {content}".strip()
super().__init__(message)
class AuthenticationError(Exception):
pass
class ProxmoxResource:
def __init__(self, **kwargs):
self._store = kwargs
def __repr__(self):
return f"ProxmoxResource ({self._store.get('base_url')})"
def __getattr__(self, item):
if item.startswith("_"):
raise AttributeError(item)
kwargs = self._store.copy()
kwargs["base_url"] = self.url_join(self._store["base_url"], item)
return ProxmoxResource(**kwargs)
def url_join(self, base, *args):
scheme, netloc, path, query, fragment = urlparse.urlsplit(base)
path = path if len(path) else "/"
path = posixpath.join(path, *[str(x) for x in args])
return urlparse.urlunsplit([scheme, netloc, path, query, fragment])
def __call__(self, resource_id=None):
if resource_id in (None, ""):
return self
if isinstance(resource_id, (bytes, str)):
resource_id = resource_id.split("/")
elif not isinstance(resource_id, (tuple, list)):
resource_id = [str(resource_id)]
kwargs = self._store.copy()
if resource_id is not None:
kwargs["base_url"] = self.url_join(self._store["base_url"], *resource_id)
return ProxmoxResource(**kwargs)
def _request(self, method, data=None, params=None):
url = self._store["base_url"]
if data:
logger.info(f"{method} {url} {data}")
else:
logger.info(f"{method} {url}")
# passing None values to pvesh command breaks it, let's remove them just as requests library does
# helpful when dealing with function default values higher in the chain, no need to clean up in multiple places
if params:
# remove keys that are set to None
params_none_keys = [k for (k, v) in params.items() if v is None]
for key in params_none_keys:
del params[key]
if data:
# remove keys that are set to None
data_none_keys = [k for (k, v) in data.items() if v is None]
for key in data_none_keys:
del data[key]
resp = self._store["session"].request(method, url, data=data, params=params)
logger.debug(f"Status code: {resp.status_code}, output: {resp.content!r}")
if resp.status_code >= 400:
if hasattr(resp, "reason"):
raise ResourceException(
resp.status_code,
httplib.responses.get(
resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
),
resp.reason,
errors=(self._store["serializer"].loads_errors(resp)),
)
else:
raise ResourceException(
resp.status_code,
httplib.responses.get(
resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
),
resp.text,
)
elif 200 <= resp.status_code <= 299:
return self._store["serializer"].loads(resp)
def get(self, *args, **params):
return self(args)._request("GET", params=params)
def post(self, *args, **data):
return self(args)._request("POST", data=data)
def put(self, *args, **data):
return self(args)._request("PUT", data=data)
def delete(self, *args, **params):
return self(args)._request("DELETE", params=params)
def create(self, *args, **data):
return self.post(*args, **data)
def set(self, *args, **data):
return self.put(*args, **data)
class ProxmoxAPI(ProxmoxResource):
def __init__(self, host=None, backend="https", service="PVE", **kwargs):
super().__init__(**kwargs)
service = service.upper()
backend = backend.lower()
# throw error for unsupported services
if service not in SERVICES.keys():
config_failure("{} service is not supported", service)
# throw error for unsupported backend for service
if backend not in SERVICES[service]["supported_backends"]:
config_failure("{} service does not support {} backend", service, backend)
if host is not None:
if backend == "local":
config_failure("{} backend does not support host keyword", backend)
else:
kwargs["host"] = host
kwargs["service"] = service
# load backend module
self._backend = importlib.import_module(f".backends.{backend}", "proxmoxer").Backend(
**kwargs
)
self._backend_name = backend
self._store = {
"base_url": self._backend.get_base_url(),
"session": self._backend.get_session(),
"serializer": self._backend.get_serializer(),
}
def __repr__(self):
dest = getattr(self._backend, "target", self._store.get("base_url"))
return f"ProxmoxAPI ({self._backend_name} backend for {dest})"
def get_tokens(self):
"""Return the auth and csrf tokens.
Returns (None, None) if the backend is not https using password authentication.
"""
if self._backend_name != "https":
return None, None
return self._backend.get_tokens()
0707010000001D000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002000000000proxmoxer-2.2.0/proxmoxer/tools0707010000001E000081A4000000000000000000000001675E3B1A000000D0000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/tools/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
from . import * # noqa: F401 F403
from .files import * # noqa: F401 F403
from .tasks import * # noqa: F401 F403
0707010000001F000081A4000000000000000000000001675E3B1A000028DA000000000000000000000000000000000000002900000000proxmoxer-2.2.0/proxmoxer/tools/files.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"
import hashlib
import logging
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin, urlparse
from proxmoxer import ProxmoxResource, ResourceException
from proxmoxer.tools.tasks import Tasks
CHECKSUM_CHUNK_SIZE = 16384 # read 16k at a time while calculating the checksum for upload
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import requests
except ImportError:
logger.error("Files tools requires 'requests' module\n")
sys.exit(1)
class ChecksumInfo:
def __init__(self, name: str, hex_size: int):
self.name = name
self.hex_size = hex_size
def __str__(self):
return self.name
def __repr__(self):
return f"{self.name} ({self.hex_size} digits)"
class SupportedChecksums(Enum):
"""
An Enum of the checksum types supported by Proxmox
"""
# ordered by preference for longer/stronger checksums first
SHA512 = ChecksumInfo("sha512", 128)
SHA256 = ChecksumInfo("sha256", 64)
SHA224 = ChecksumInfo("sha224", 56)
SHA384 = ChecksumInfo("sha384", 96)
MD5 = ChecksumInfo("md5", 32)
SHA1 = ChecksumInfo("sha1", 40)
class Files:
"""
Ease-of-use tools for interacting with the uploading/downloading files
in Proxmox VE
"""
def __init__(self, prox: ProxmoxResource, node: str, storage: str):
self._prox = prox
self._node = node
self._storage = storage
def __repr__(self):
return f"Files ({self._node}/{self._storage} at {self._prox})"
def upload_local_file_to_storage(
self,
filename: str,
do_checksum_check: bool = True,
blocking_status: bool = True,
):
file_path = Path(filename)
if not file_path.is_file():
logger.error(f'"{file_path.absolute()}" does not exist or is not a file')
return None
# init to None in case errors cause no values to be set
upid: str = ""
checksum: str = None
checksum_type: str = None
try:
with open(file_path.absolute(), "rb") as f_obj:
if do_checksum_check:
# iterate through SupportedChecksums and find the first one in hashlib.algorithms_available
for checksum_info in (v.value for v in SupportedChecksums):
if checksum_info.name in hashlib.algorithms_available:
checksum_type = checksum_info.name
break
if checksum_type is None:
logger.warning(
"There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation"
)
else:
h = hashlib.new(checksum_type)
# Iterate through the file in CHECKSUM_CHUNK_SIZE size
for byte_block in iter(lambda: f_obj.read(CHECKSUM_CHUNK_SIZE), b""):
h.update(byte_block)
checksum = h.hexdigest()
logger.debug(
f"The {checksum_type} checksum of {file_path.absolute()} is {checksum}"
)
# reset to the start of the file so the upload can use the same file handle
f_obj.seek(0)
params = {
"content": "iso" if file_path.absolute().name.endswith("iso") else "vztmpl",
"checksum-algorithm": checksum_type,
"checksum": checksum,
"filename": f_obj,
}
upid = self._prox.nodes(self._node).storage(self._storage).upload.post(**params)
except OSError as e:
logger.error(e)
return None
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def download_file_to_storage(
self,
url: str,
checksum: Optional[str] = None,
checksum_type: Optional[str] = None,
blocking_status: bool = True,
):
file_info = self.get_file_info(url)
filename = None
if file_info is not None:
filename = file_info.get("filename")
if checksum is None and checksum_type is None:
checksum, checksum_info = self.get_checksums_from_file_url(url, filename)
checksum_type = checksum_info.name if checksum_info else None
elif checksum is None or checksum_type is None:
logger.error(
"Must pass both checksum and checksum_type or leave both None for auto-discovery"
)
return None
if checksum is None or checksum_type is None:
logger.warning("Unable to discover checksum. Will not do checksum validation")
params = {
"checksum-algorithm": checksum_type,
"url": url,
"checksum": checksum,
"content": "iso" if url.endswith("iso") else "vztmpl",
"filename": filename,
}
upid = self._prox.nodes(self._node).storage(self._storage)("download-url").post(**params)
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def get_file_info(self, url: str):
try:
return self._prox.nodes(self._node)("query-url-metadata").get(url=url)
except ResourceException as e:
logger.warning(f"Unable to get information for {url}: {e}")
return None
@staticmethod
def get_checksums_from_file_url(
url: str, filename: str = None, preferred_type=SupportedChecksums.SHA512.value
):
getters_by_quality = [
Files._get_checksum_from_sibling_file,
Files._get_checksum_from_extension,
Files._get_checksum_from_extension_upper,
]
# hacky way to try the preferred_type first while still trying all types with no duplicates
all_types_with_priority = list(
dict.fromkeys([preferred_type, *(map(lambda t: t.value, SupportedChecksums))])
)
for c_info in all_types_with_priority:
for getter in getters_by_quality:
checksum: str = getter(url, c_info, filename)
if checksum is not None:
logger.info(f"{getter} found {str(c_info)} checksum {checksum}")
return (checksum, c_info)
else:
logger.debug(f"{getter} found no {str(c_info)} checksum")
return (None, None)
@staticmethod
def _get_checksum_from_sibling_file(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file in the same path as the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = urljoin(url, (checksum_info.name + "SUMS").upper())
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension_upper(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name.upper()
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_helper(sumfile_url: str, filename: str, checksum_info: ChecksumInfo):
logger.debug(f"getting {sumfile_url}")
try:
resp = requests.get(sumfile_url, timeout=10)
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
logger.info(f"Failed when trying to get {sumfile_url}")
return None
if resp.status_code == 200:
for line in resp.iter_lines():
line_str = line.decode("utf-8")
logger.debug(f"checking for '{filename}' in '{line_str}'")
if filename in str(line_str):
return line_str[0 : checksum_info.hex_size]
return None
07070100000020000081A4000000000000000000000001675E3B1A00000A9D000000000000000000000000000000000000002900000000proxmoxer-2.2.0/proxmoxer/tools/tasks.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import time
class Tasks:
"""
Ease-of-use tools for interacting with the tasks endpoints
in the Proxmox API.
"""
@staticmethod
def blocking_status(prox, task_id, timeout=300, polling_interval=1):
"""
Turns getting the status of a Proxmox task into a blocking call
by polling the API until the task completes
:param prox: The Proxmox object used to query for status
:type prox: ProxmoxAPI
:param task_id: the UPID of the task
:type task_id: str
:param timeout: If the task does not complete in this time (in seconds) return None, defaults to 300
:type timeout: int, optional
:param polling_interval: the time to wait between checking for status updates, defaults to 1
:type polling_interval: float, optional
:return: the status of the task
:rtype: dict
"""
node: str = Tasks.decode_upid(task_id)["node"]
start_time: float = time.monotonic()
data = {"status": ""}
while data["status"] != "stopped":
data = prox.nodes(node).tasks(task_id).status.get()
if start_time + timeout <= time.monotonic():
data = None # type: ignore
break
time.sleep(polling_interval)
return data
@staticmethod
def decode_upid(upid):
"""
Decodes the sections of a UPID into separate fields
:param upid: a UPID string
:type upid: str
:return: The decoded information from the UPID
:rtype: dict
"""
segments = upid.split(":")
if segments[0] != "UPID" or len(segments) != 9:
raise AssertionError("UPID is not in the correct format")
data = {
"upid": upid,
"node": segments[1],
"pid": int(segments[2], 16),
"pstart": int(segments[3], 16),
"starttime": int(segments[4], 16),
"type": segments[5],
"id": segments[6],
"user": segments[7].split("!")[0],
"comment": segments[8],
}
return data
@staticmethod
def decode_log(log_list):
"""
Takes in a task's log data and returns a multiline string representation
:param log_list: The log formatting returned by the Proxmox API
:type log_list: list of dicts
:return: a multiline string of the log
:rtype: str
"""
str_list = [""] * len(log_list)
for line in log_list:
str_list[line["n"] - 1] = line.get("t", "")
return "\n".join(str_list)
07070100000021000081A4000000000000000000000001675E3B1A00000039000000000000000000000000000000000000001F00000000proxmoxer-2.2.0/pyproject.toml[tool.black]
line-length = 100
target-version = ['py37']
07070100000022000081A4000000000000000000000001675E3B1A000001D2000000000000000000000000000000000000001A00000000proxmoxer-2.2.0/setup.cfg[pylint]
max-line-length = 100
# allow single letter variables
variable-rgx = [a-z0-9_]{1,30}$
[pylint.messages_control]
# let black handle line length
# ignore some pytohn3 only features (f-strings)
disable = C0330, C0326, C0114, line-too-long, missing-function-docstring, consider-using-f-string,missing-class-docstring
[flake8]
max-line-length = 100
extend-ignore = E203, E501, F811
exclude = .git,__pycache__,old,build,dist,*.egg-info
[isort]
profile = black
07070100000023000081A4000000000000000000000001675E3B1A00000823000000000000000000000000000000000000001900000000proxmoxer-2.2.0/setup.py#!/usr/bin/env python
import codecs
import os
import re
import sys
from setuptools import setup
from proxmoxer import __version__ as proxmoxer_version
if not os.path.exists("README.txt") and "sdist" in sys.argv:
with codecs.open("README.rst", encoding="utf8") as f:
rst = f.read()
code_block = r"(:\n\n)?\.\. code-block::.*"
rst = re.sub(code_block, "::", rst)
with codecs.open("README.txt", encoding="utf8", mode="wb") as f:
f.write(rst)
try:
readme = "README.txt" if os.path.exists("README.txt") else "README.rst"
long_description = codecs.open(readme, encoding="utf-8").read()
except IOError:
long_description = "Could not read README.txt"
setup(
name="proxmoxer",
version=proxmoxer_version,
description="Python Wrapper for the Proxmox 2.x API (HTTP and SSH)",
author="Oleg Butovich",
author_email="obutovich@gmail.com",
license="MIT",
url="https://proxmoxer.github.io/docs/",
download_url="http://pypi.python.org/pypi/proxmoxer",
keywords=["proxmox", "api"],
packages=["proxmoxer", "proxmoxer.backends", "proxmoxer.tools"],
classifiers=[ # http://pypi.python.org/pypi?%3Aaction=list_classifiers
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3",
"Programming Language :: Python",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: System :: Clustering",
"Topic :: System :: Monitoring",
"Topic :: System :: Systems Administration",
],
long_description=long_description,
long_description_content_type="text/x-rst",
)
07070100000024000081A4000000000000000000000001675E3B1A000000A0000000000000000000000000000000000000002600000000proxmoxer-2.2.0/test_requirements.txt# required libraries for full functionality
openssh_wrapper
paramiko
requests
requests_toolbelt
# used by test framework
coveralls
pytest
pytest-cov
responses
07070100000025000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001600000000proxmoxer-2.2.0/tests07070100000026000081A4000000000000000000000001675E3B1A0000005C000000000000000000000000000000000000002200000000proxmoxer-2.2.0/tests/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
07070100000027000081A4000000000000000000000001675E3B1A00002E22000000000000000000000000000000000000002200000000proxmoxer-2.2.0/tests/api_mock.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import json
import re
from urllib.parse import parse_qsl, urlparse
import pytest
import responses
from requests_toolbelt import MultipartEncoder
@pytest.fixture()
def mock_pve():
with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps:
yield rsps
class PVERegistry(responses.registries.FirstMatchRegistry):
base_url = "https://1.2.3.4:1234/api2/json"
common_headers = {
"Cache-Control": "max-age=0",
"Connection": "close, Keep-Alive",
"Pragma": "no-cache",
"Server": "pve-api-daemon/3.0",
"Content-Type": "application/json;charset=UTF-8",
}
def __init__(self):
super().__init__()
for resp in self._generate_static_responses():
self.add(resp)
for resp in self._generate_dynamic_responses():
self.add(resp)
def _generate_static_responses(self):
resps = []
# Basic GET requests
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/version",
json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}},
)
)
resps.append(
responses.Response(
method="POST",
url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"),
# "done" added to UPID so polling will terminate (status checking is tested elsewhere)
json={
"data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done",
"success": 1,
},
)
)
resps.append(
responses.Response(
method="POST",
url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"),
# "done" added to UPID so polling will terminate (status checking is tested elsewhere)
json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"},
)
)
resps.append(
responses.Response(
method="POST",
url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"),
status=500,
body="storage 'missing' does not exist",
)
)
return resps
def _generate_dynamic_responses(self):
resps = []
# Authentication
resps.append(
responses.CallbackResponse(
method="POST",
url=self.base_url + "/access/ticket",
callback=self._cb_password_auth,
)
)
# Session testing
resps.append(
responses.CallbackResponse(
method="GET",
url=self.base_url + "/fake/echo",
callback=self._cb_echo,
)
)
resps.append(
responses.CallbackResponse(
method="GET",
url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"),
callback=self._cb_echo,
)
)
resps.append(
responses.CallbackResponse(
method="GET",
url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"),
callback=self._cb_qemu_monitor,
)
)
resps.append(
responses.CallbackResponse(
method="GET",
url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"),
callback=self._cb_task_status,
)
)
resps.append(
responses.CallbackResponse(
method="GET",
url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"),
callback=self._cb_url_metadata,
)
)
return resps
###################################
# Callbacks for Dynamic Responses #
###################################
def _cb_echo(self, request):
body = request.body
if body is not None:
if isinstance(body, MultipartEncoder):
body = body.to_string() # really, to byte string
body = body if isinstance(body, str) else str(body, "utf-8")
resp = {
"method": request.method,
"url": request.url,
"headers": dict(request.headers),
"cookies": request._cookies.get_dict(),
"body": body,
# "body_json": dict(parse_qsl(request.body)),
}
return (200, self.common_headers, json.dumps(resp))
def _cb_password_auth(self, request):
form_data_dict = dict(parse_qsl(request.body))
# if this user should not be authenticated
if form_data_dict.get("username") == "bad_auth":
return (
401,
self.common_headers,
json.dumps({"data": None}),
)
# if this user requires OTP and it is not included
if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None:
return (
200,
self.common_headers,
json.dumps(
{
"data": {
"ticket": "otp_ticket",
"CSRFPreventionToken": "CSRFPreventionToken",
"NeedTFA": 1,
}
}
),
)
# if this is the first ticket
if form_data_dict.get("password") != "ticket":
return (
200,
self.common_headers,
json.dumps(
{"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}}
),
)
# if this is refreshing the ticket, return new ticket
else:
return (
200,
self.common_headers,
json.dumps(
{
"data": {
"ticket": "new_ticket",
"CSRFPreventionToken": "CSRFPreventionToken_2",
}
}
),
)
def _cb_task_status(self, request):
resp = {}
if "keep-running" in request.url:
resp = {
"data": {
"id": "110",
"pid": 1044989,
"node": "node1",
"pstart": 284768076,
"status": "running",
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
}
}
elif "stopped" in request.url:
resp = {
"data": {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "interrupted by signal",
"pid": 1044989,
"id": "110",
"node": "node1",
}
}
elif "done" in request.url:
resp = {
"data": {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "OK",
"pid": 1044989,
"id": "110",
"node": "node1",
}
}
elif "comment" in request.url:
resp = {
"data": {
"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
"node": "node",
"pid": 0,
"pstart": 0,
"starttime": 0,
"type": "task",
"id": "id",
"user": "root@pam",
"status": "stopped",
"exitstatus": "OK",
}
}
return (200, self.common_headers, json.dumps(resp))
def _cb_url_metadata(self, request):
form_data_dict = dict(parse_qsl((urlparse(request.url)).query))
if "file.iso" in form_data_dict.get("url", ""):
return (
200,
self.common_headers,
json.dumps(
{
"data": {
"size": 123456,
"filename": "file.iso",
"mimetype": "application/x-iso9660-image",
# "mimetype": "application/octet-stream",
},
"success": 1,
}
),
)
elif "invalid.iso" in form_data_dict.get("url", ""):
return (
500,
self.common_headers,
json.dumps(
{
"status": 500,
"message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n",
"success": 0,
"data": None,
}
),
)
elif "missing.iso" in form_data_dict.get("url", ""):
return (
500,
self.common_headers,
json.dumps(
{
"status": 500,
"success": 0,
"message": "invalid server response: '404 Not Found'\n",
"data": None,
}
),
)
elif "index.html" in form_data_dict.get("url", ""):
return (
200,
self.common_headers,
json.dumps(
{
"success": 1,
"data": {"filename": "index.html", "mimetype": "text/html", "size": 17664},
}
),
)
def _cb_qemu_monitor(self, request):
body = request.body
if body is not None:
body = body if isinstance(body, str) else str(body, "utf-8")
# if the command is an array, throw the type error PVE would throw
if "&" in body:
return (
400,
self.common_headers,
json.dumps(
{
"data": None,
"errors": {"command": "type check ('string') failed - got ARRAY"},
}
),
)
else:
resp = {
"method": request.method,
"url": request.url,
"headers": dict(request.headers),
"cookies": request._cookies.get_dict(),
"body": body,
# "body_json": dict(parse_qsl(request.body)),
}
print(resp)
return (200, self.common_headers, json.dumps(resp))
07070100000028000081A4000000000000000000000001675E3B1A00000E5A000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/files_mock.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import re
import pytest
import responses
from requests import exceptions
from .api_mock import PVERegistry
@pytest.fixture()
def mock_files():
with responses.RequestsMock(
registry=FilesRegistry, assert_all_requests_are_fired=False
) as rsps:
yield rsps
class FilesRegistry(responses.registries.FirstMatchRegistry):
base_url = "https://sub.domain.tld"
common_headers = {
"Cache-Control": "max-age=0",
"Connection": "close, Keep-Alive",
"Pragma": "no-cache",
"Server": "pve-api-daemon/3.0",
"Content-Type": "application/json;charset=UTF-8",
}
def __init__(self):
super().__init__()
for resp in self._generate_static_responses():
self.add(resp)
def _generate_static_responses(self):
resps = []
# Basic GET requests
resps.append(responses.Response(method="GET", url=self.base_url, body="hello world"))
resps.append(
responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS")
)
# sibling
resps.append(
responses.Response(
method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n"
)
)
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/sibling/TESTINGSUMS",
body="this_is_the_hash file.iso",
)
)
# extension
resps.append(
responses.Response(
method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n"
)
)
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/extension/file.iso.testing",
body="this_is_the_hash file.iso",
)
)
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/extension/connectionerror.iso.testing",
body=exceptions.ConnectionError(),
)
)
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/extension/readtimeout.iso.testing",
body=exceptions.ReadTimeout(),
)
)
# extension upper
resps.append(
responses.Response(
method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n"
)
)
resps.append(
responses.Response(
method="GET",
url=self.base_url + "/upper/file.iso.TESTING",
body="this_is_the_hash file.iso",
)
)
resps.append(
responses.Response(
method="GET",
url=re.compile(self.base_url + r"/checksums/file.iso.\w+"),
body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso",
)
)
return resps
@pytest.fixture()
def mock_files_and_pve():
with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps:
yield rsps
class BothRegistry(responses.registries.FirstMatchRegistry):
def __init__(self):
super().__init__()
registries = [FilesRegistry(), PVERegistry()]
for reg in registries:
for resp in reg.registered:
self.add(resp)
07070100000029000081A4000000000000000000000001675E3B1A00004215000000000000000000000000000000000000002800000000proxmoxer-2.2.0/tests/known_issues.json{
"errors": [],
"generated_at": "2022-08-25T03:08:48Z",
"metrics": {
"_totals": {
"CONFIDENCE.HIGH": 3,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 11,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 3,
"SEVERITY.MEDIUM": 11,
"SEVERITY.UNDEFINED": 0,
"loc": 1947,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/__init__.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 5,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/__init__.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 3,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/command_base.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 115,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/https.py": {
"CONFIDENCE.HIGH": 1,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 1,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 286,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/local.py": {
"CONFIDENCE.HIGH": 2,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 2,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 14,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/openssh.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 53,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/backends/ssh_paramiko.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 1,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 1,
"SEVERITY.UNDEFINED": 0,
"loc": 58,
"nosec": 0,
"skipped_tests": 0
},
"proxmoxer/core.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 155,
"nosec": 0,
"skipped_tests": 0
},
"tests/api_mock.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 108,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_command_base.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 2,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 2,
"SEVERITY.UNDEFINED": 0,
"loc": 195,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_core.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 241,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_https.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 362,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_https_helpers.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 62,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_imports.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 80,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_local.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 35,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_openssh.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 2,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 2,
"SEVERITY.UNDEFINED": 0,
"loc": 62,
"nosec": 0,
"skipped_tests": 0
},
"tests/test_paramiko.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 6,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 6,
"SEVERITY.UNDEFINED": 0,
"loc": 113,
"nosec": 0,
"skipped_tests": 0
}
},
"results": [
{
"code": "332 def get_serializer(self):\n333 assert self.mode == \"json\"\n334 return JsonSerializer()\n",
"col_offset": 8,
"filename": "proxmoxer/backends/https.py",
"issue_confidence": "HIGH",
"issue_cwe": {
"id": 703,
"link": "https://cwe.mitre.org/data/definitions/703.html"
},
"issue_severity": "LOW",
"issue_text": "Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.",
"line_number": 333,
"line_range": [
333
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b101_assert_used.html",
"test_id": "B101",
"test_name": "assert_used"
},
{
"code": "1 import shutil\n2 from subprocess import PIPE, Popen\n3 \n4 from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession\n",
"col_offset": 0,
"filename": "proxmoxer/backends/local.py",
"issue_confidence": "HIGH",
"issue_cwe": {
"id": 78,
"link": "https://cwe.mitre.org/data/definitions/78.html"
},
"issue_severity": "LOW",
"issue_text": "Consider possible security implications associated with the subprocess module.",
"line_number": 2,
"line_range": [
2,
3
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/blacklists/blacklist_imports.html#b404-import-subprocess",
"test_id": "B404",
"test_name": "blacklist"
},
{
"code": "8 def _exec(self, cmd):\n9 proc = Popen(cmd, stdout=PIPE, stderr=PIPE)\n10 stdout, stderr = proc.communicate(timeout=self.timeout)\n",
"col_offset": 15,
"filename": "proxmoxer/backends/local.py",
"issue_confidence": "HIGH",
"issue_cwe": {
"id": 78,
"link": "https://cwe.mitre.org/data/definitions/78.html"
},
"issue_severity": "LOW",
"issue_text": "subprocess call - check for execution of untrusted input.",
"line_number": 9,
"line_range": [
9
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b603_subprocess_without_shell_equals_true.html",
"test_id": "B603",
"test_name": "subprocess_without_shell_equals_true"
},
{
"code": "62 session = self.ssh_client.get_transport().open_session()\n63 session.exec_command(shell_join(cmd))\n64 stdout = session.makefile(\"rb\", -1).read().decode()\n",
"col_offset": 8,
"filename": "proxmoxer/backends/ssh_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 78,
"link": "https://cwe.mitre.org/data/definitions/78.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Possible shell injection via Paramiko call, check inputs are properly sanitized.",
"line_number": 63,
"line_range": [
63
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b601_paramiko_calls.html",
"test_id": "B601",
"test_name": "paramiko_calls"
},
{
"code": "39 with pytest.raises(NotImplementedError), tempfile.TemporaryFile(\"w+b\") as f_obj:\n40 self._session.upload_file_obj(f_obj, \"/tmp/file.iso\")\n41 \n",
"col_offset": 49,
"filename": "tests/test_command_base.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 40,
"line_range": [
40
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "160 \"-tmpfilename\",\n161 \"/tmp/tmpasdfasdf\",\n162 \"--output-format\",\n163 \"json\",\n164 ]\n165 \n166 \n167 class TestJsonSimpleSerializer:\n168 _serializer = command_base.JsonSimpleSerializer()\n169 \n170 def test_loads_pass(self):\n171 input_str = '{\"key1\": \"value1\", \"key2\": \"value2\"}'\n172 exp_output = {\"key1\": \"value1\", \"key2\": \"value2\"}\n173 \n",
"col_offset": 16,
"filename": "tests/test_command_base.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 161,
"line_range": [
152,
153,
154,
155,
156,
157,
158,
159,
160,
161,
162,
163
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "61 with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n62 mock_session.upload_file_obj(f_obj, \"/tmp/file\")\n63 \n",
"col_offset": 48,
"filename": "tests/test_openssh.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 62,
"line_range": [
62
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "65 (f_obj,),\n66 target=\"/tmp/file\",\n67 )\n",
"col_offset": 23,
"filename": "tests/test_openssh.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 66,
"line_range": [
66
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "23 sess = ssh_paramiko.SshParamikoSession(\n24 \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n25 )\n",
"col_offset": 66,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 24,
"line_range": [
24
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "29 assert sess.password == \"password\"\n30 assert sess.private_key_file == \"/tmp/key_file\"\n31 assert sess.port == 1234\n",
"col_offset": 40,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 30,
"line_range": [
30
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "55 sess = ssh_paramiko.SshParamikoSession(\n56 \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n57 )\n",
"col_offset": 66,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 56,
"line_range": [
56
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "63 look_for_keys=True,\n64 key_filename=\"/tmp/key_file\",\n65 password=\"password\",\n",
"col_offset": 25,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 64,
"line_range": [
64
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "110 with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n111 sess.upload_file_obj(f_obj, \"/tmp/file\")\n112 \n",
"col_offset": 40,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 111,
"line_range": [
111
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
},
{
"code": "112 \n113 mock_sftp.putfo.assert_called_once_with(f_obj, \"/tmp/file\")\n114 \n",
"col_offset": 59,
"filename": "tests/test_paramiko.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 377,
"link": "https://cwe.mitre.org/data/definitions/377.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Probable insecure usage of temp file/directory.",
"line_number": 113,
"line_range": [
113
],
"more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
"test_id": "B108",
"test_name": "hardcoded_tmp_directory"
}
]
}0707010000002A000081A4000000000000000000000001675E3B1A0000238E000000000000000000000000000000000000002B00000000proxmoxer-2.2.0/tests/test_command_base.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import tempfile
from unittest import mock
import pytest
from proxmoxer.backends import command_base
from .api_mock import PVERegistry
# pylint: disable=no-self-use
class TestResponse:
def test_init_all_args(self):
resp = command_base.Response(b"content", 200)
assert resp.content == b"content"
assert resp.text == "b'content'"
assert resp.status_code == 200
assert resp.headers == {"content-type": "application/json"}
assert str(resp) == "Response (200) b'content'"
class TestCommandBaseSession:
base_url = PVERegistry.base_url
_session = command_base.CommandBaseSession()
def test_init_all_args(self):
sess = command_base.CommandBaseSession(service="SERVICE", timeout=10, sudo=True)
assert sess.service == "service"
assert sess.timeout == 10
assert sess.sudo is True
def test_exec(self):
with pytest.raises(NotImplementedError):
self._session._exec("command")
def test_upload_file_obj(self):
with pytest.raises(NotImplementedError), tempfile.TemporaryFile("w+b") as f_obj:
self._session.upload_file_obj(f_obj, "/tmp/file.iso")
def test_request_basic(self, mock_exec):
resp = self._session.request("GET", self.base_url + "/fake/echo")
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"get",
self.base_url + "/fake/echo",
"--output-format",
"json",
]
def test_request_task(self, mock_exec_task):
resp = self._session.request("GET", self.base_url + "/stdout")
assert resp.status_code == 200
assert (
resp.content == "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
)
resp_stderr = self._session.request("GET", self.base_url + "/stderr")
assert resp_stderr.status_code == 200
assert (
resp_stderr.content
== "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
)
# assert False # DEBUG
def test_request_error(self, mock_exec_err):
resp = self._session.request(
"GET", self.base_url + "/fake/echo", data={"thing": "403 Unauthorized"}
)
assert resp.status_code == 403
assert (
resp.content
== "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\n403 Unauthorized\n--output-format\njson"
)
def test_request_error_generic(self, mock_exec_err):
resp = self._session.request("GET", self.base_url + "/fake/echo", data={"thing": "failure"})
assert resp.status_code == 500
assert (
resp.content
== "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\nfailure\n--output-format\njson"
)
def test_request_sudo(self, mock_exec):
resp = command_base.CommandBaseSession(sudo=True).request(
"GET", self.base_url + "/fake/echo"
)
assert resp.status_code == 200
assert resp.content == [
"sudo",
"pvesh",
"get",
self.base_url + "/fake/echo",
"--output-format",
"json",
]
def test_request_data(self, mock_exec):
resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"})
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"get",
self.base_url + "/fake/echo",
"-key",
"value",
"--output-format",
"json",
]
def test_request_bytes_data(self, mock_exec):
resp = self._session.request(
"GET", self.base_url + "/fake/echo", data={"key": b"bytes-value"}
)
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"get",
self.base_url + "/fake/echo",
"-key",
"bytes-value",
"--output-format",
"json",
]
def test_request_qemu_exec(self, mock_exec):
resp = self._session.request(
"POST",
self.base_url + "/node/node1/qemu/100/agent/exec",
data={"command": "echo 'hello world'"},
)
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"create",
self.base_url + "/node/node1/qemu/100/agent/exec",
"-command",
"echo",
"-command",
"hello world",
"--output-format",
"json",
]
def test_request_qemu_exec_list(self, mock_exec):
resp = self._session.request(
"POST",
self.base_url + "/node/node1/qemu/100/agent/exec",
data={"command": ["echo", "hello world"]},
)
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"create",
self.base_url + "/node/node1/qemu/100/agent/exec",
"-command",
"echo",
"-command",
"hello world",
"--output-format",
"json",
]
def test_request_upload(self, mock_exec, mock_upload_file_obj):
with tempfile.NamedTemporaryFile("w+b") as f_obj:
resp = self._session.request(
"POST",
self.base_url + "/node/node1/storage/local/upload",
data={"content": "iso", "filename": f_obj},
)
assert resp.status_code == 200
assert resp.content == [
"pvesh",
"create",
self.base_url + "/node/node1/storage/local/upload",
"-content",
"iso",
"-filename",
str(f_obj.name),
"-tmpfilename",
"/tmp/tmpasdfasdf",
"--output-format",
"json",
]
class TestJsonSimpleSerializer:
_serializer = command_base.JsonSimpleSerializer()
def test_loads_pass(self):
input_str = '{"key1": "value1", "key2": "value2"}'
exp_output = {"key1": "value1", "key2": "value2"}
response = command_base.Response(input_str.encode("utf-8"), 200)
act_output = self._serializer.loads(response)
assert act_output == exp_output
def test_loads_not_json(self):
input_str = "There was an error with the request"
exp_output = {"errors": b"There was an error with the request"}
response = command_base.Response(input_str.encode("utf-8"), 200)
act_output = self._serializer.loads(response)
assert act_output == exp_output
def test_loads_not_unicode(self):
input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80'
exp_output = {"errors": input_str.encode("utf-8")}
response = command_base.Response(input_str.encode("utf-8"), 200)
act_output = self._serializer.loads(response)
assert act_output == exp_output
class TestCommandBaseBackend:
backend = command_base.CommandBaseBackend()
sess = command_base.CommandBaseSession()
backend.session = sess
def test_init(self):
b = command_base.CommandBaseBackend()
assert b.session is None
assert b.target is None
def test_get_session(self):
assert self.backend.get_session() == self.sess
def test_get_base_url(self):
assert self.backend.get_base_url() == ""
def test_get_serializer(self):
assert isinstance(self.backend.get_serializer(), command_base.JsonSimpleSerializer)
@classmethod
def _exec_echo(_, cmd):
# if getting a tmpfile on the remote, return fake tmpfile
if cmd == [
"python3",
"-c",
"import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
]:
return b"/tmp/tmpasdfasdf", None
return cmd, None
@classmethod
def _exec_err(_, cmd):
return None, "\n".join(cmd)
@classmethod
def _exec_task(_, cmd):
upid = "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
if "stderr" in cmd[2]:
return None, upid
else:
return upid, None
@classmethod
def upload_file_obj_echo(_, file_obj, remote_path):
return file_obj, remote_path
@pytest.fixture
def mock_upload_file_obj():
with mock.patch.object(
command_base.CommandBaseSession, "upload_file_obj", upload_file_obj_echo
):
yield
@pytest.fixture
def mock_exec():
with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_echo):
yield
@pytest.fixture
def mock_exec_task():
with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_task):
yield
@pytest.fixture
def mock_exec_err():
with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_err):
yield
0707010000002B000081A4000000000000000000000001675E3B1A00003709000000000000000000000000000000000000002300000000proxmoxer-2.2.0/tests/test_core.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import logging
from unittest import mock
import pytest
from proxmoxer import core
from proxmoxer.backends import https
from proxmoxer.backends.command_base import JsonSimpleSerializer, Response
from .api_mock import ( # pylint: disable=unused-import # noqa: F401
PVERegistry,
mock_pve,
)
from .test_paramiko import mock_ssh_client # pylint: disable=unused-import # noqa: F401
# pylint: disable=no-self-use,protected-access
MODULE_LOGGER_NAME = "proxmoxer.core"
class TestResourceException:
def test_init_none(self):
e = core.ResourceException(None, None, None)
assert e.status_code is None
assert e.status_message is None
assert e.content is None
assert e.errors is None
assert str(e) == "None None: None"
assert repr(e) == "ResourceException('None None: None')"
def test_init_basic(self):
e = core.ResourceException(500, "Internal Error", "Unable to do the thing")
assert e.status_code == 500
assert e.status_message == "Internal Error"
assert e.content == "Unable to do the thing"
assert e.errors is None
assert str(e) == "500 Internal Error: Unable to do the thing"
assert repr(e) == "ResourceException('500 Internal Error: Unable to do the thing')"
def test_init_error(self):
e = core.ResourceException(
500, "Internal Error", "Unable to do the thing", "functionality not found"
)
assert e.status_code == 500
assert e.status_message == "Internal Error"
assert e.content == "Unable to do the thing"
assert e.errors == "functionality not found"
assert str(e) == "500 Internal Error: Unable to do the thing - functionality not found"
assert (
repr(e)
== "ResourceException('500 Internal Error: Unable to do the thing - functionality not found')"
)
class TestProxmoxResource:
obj = core.ProxmoxResource()
base_url = "http://example.com/"
def test_url_join_empty_base(self):
assert "/" == self.obj.url_join("", "")
def test_url_join_empty(self):
assert "https://www.example.com:80/" == self.obj.url_join("https://www.example.com:80", "")
def test_url_join_basic(self):
assert "https://www.example.com/nodes/node1" == self.obj.url_join(
"https://www.example.com", "nodes", "node1"
)
def test_url_join_all_segments(self):
assert "https://www.example.com/base/path#div1?search=query" == self.obj.url_join(
"https://www.example.com/base#div1?search=query", "path"
)
def test_repr(self):
obj = core.ProxmoxResource(base_url="root")
assert repr(obj.first.second("third")) == "ProxmoxResource (root/first/second/third)"
def test_getattr_private(self):
with pytest.raises(AttributeError) as exc_info:
self.obj._thing
assert str(exc_info.value) == "_thing"
def test_getattr_single(self):
test_obj = core.ProxmoxResource(base_url=self.base_url)
ret = test_obj.nodes
assert isinstance(ret, core.ProxmoxResource)
assert ret._store["base_url"] == self.base_url + "nodes"
def test_call_basic(self):
test_obj = core.ProxmoxResource(base_url=self.base_url)
ret = test_obj("nodes")
assert isinstance(ret, core.ProxmoxResource)
assert ret._store["base_url"] == self.base_url + "nodes"
def test_call_emptystr(self):
test_obj = core.ProxmoxResource(base_url=self.base_url)
ret = test_obj("")
assert isinstance(ret, core.ProxmoxResource)
assert ret._store["base_url"] == self.base_url
def test_call_list(self):
test_obj = core.ProxmoxResource(base_url=self.base_url)
ret = test_obj(["nodes", "node1"])
assert isinstance(ret, core.ProxmoxResource)
assert ret._store["base_url"] == self.base_url + "nodes/node1"
def test_call_stringable(self):
test_obj = core.ProxmoxResource(base_url=self.base_url)
class Thing:
def __str__(self):
return "string"
ret = test_obj(Thing())
assert isinstance(ret, core.ProxmoxResource)
assert ret._store["base_url"] == self.base_url + "string"
def test_request_basic_get(self, mock_resource, caplog):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
ret = mock_resource._request("GET", params={"key": "value"})
assert caplog.record_tuples == [
(MODULE_LOGGER_NAME, logging.INFO, "GET " + self.base_url),
(
MODULE_LOGGER_NAME,
logging.DEBUG,
'Status code: 200, output: b\'{"data": {"key": "value"}}\'',
),
]
assert ret == {"data": {"key": "value"}}
def test_request_basic_post(self, mock_resource, caplog):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
ret = mock_resource._request("POST", data={"key": "value"})
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.INFO,
"POST " + self.base_url + " " + str({"key": "value"}),
),
(
MODULE_LOGGER_NAME,
logging.DEBUG,
'Status code: 200, output: b\'{"data": {"key": "value"}}\'',
),
]
assert ret == {"data": {"key": "value"}}
def test_request_fail(self, mock_resource, caplog):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
with pytest.raises(core.ResourceException) as exc_info:
mock_resource("fail")._request("GET")
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.INFO,
"GET " + self.base_url + "fail",
),
(
MODULE_LOGGER_NAME,
logging.DEBUG,
"Status code: 500, output: b'this is the error'",
),
]
assert exc_info.value.status_code == 500
assert exc_info.value.status_message == "Internal Server Error"
assert exc_info.value.content == str(b"this is the error")
assert exc_info.value.errors is None
def test_request_fail_with_reason(self, mock_resource, caplog):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
with pytest.raises(core.ResourceException) as exc_info:
mock_resource(["fail", "reason"])._request("GET")
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.INFO,
"GET " + self.base_url + "fail/reason",
),
(
MODULE_LOGGER_NAME,
logging.DEBUG,
"Status code: 500, output: b'this is the error'",
),
]
assert exc_info.value.status_code == 500
assert exc_info.value.status_message == "Internal Server Error"
assert exc_info.value.content == "this is the reason"
assert exc_info.value.errors == {"errors": b"this is the error"}
def test_request_params_cleanup(self, mock_resource):
mock_resource._request("GET", params={"key": "value", "remove_me": None})
assert mock_resource._store["session"].params == {"key": "value"}
def test_request_data_cleanup(self, mock_resource):
mock_resource._request("POST", data={"key": "value", "remove_me": None})
assert mock_resource._store["session"].data == {"key": "value"}
class TestProxmoxResourceMethods:
_resource = core.ProxmoxResource(base_url="https://example.com")
def test_get(self, mock_private_request):
ret = self._resource.get("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "GET"
assert ret["params"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
def test_post(self, mock_private_request):
ret = self._resource.post("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "POST"
assert ret["data"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
def test_put(self, mock_private_request):
ret = self._resource.put("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "PUT"
assert ret["data"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
def test_delete(self, mock_private_request):
ret = self._resource.delete("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "DELETE"
assert ret["params"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
def test_create(self, mock_private_request):
ret = self._resource.create("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "POST"
assert ret["data"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
def test_set(self, mock_private_request):
ret = self._resource.set("nodes", key="value")
ret_self = ret["self"]
assert ret["method"] == "PUT"
assert ret["data"] == {"key": "value"}
assert ret_self._store["base_url"] == "https://example.com/nodes"
class TestProxmoxAPI:
def test_init_basic(self):
prox = core.ProxmoxAPI(
"host", token_name="name", token_value="value", service="pVe", backend="hTtPs"
)
assert isinstance(prox, core.ProxmoxAPI)
assert isinstance(prox, core.ProxmoxResource)
assert isinstance(prox._backend, https.Backend)
assert prox._backend.auth.service == "PVE"
def test_init_invalid_service(self):
with pytest.raises(NotImplementedError) as exc_info:
core.ProxmoxAPI("host", service="NA")
assert str(exc_info.value) == "NA service is not supported"
def test_init_invalid_backend(self):
with pytest.raises(NotImplementedError) as exc_info:
core.ProxmoxAPI("host", service="pbs", backend="LocaL")
assert str(exc_info.value) == "PBS service does not support local backend"
def test_init_local_with_host(self):
with pytest.raises(NotImplementedError) as exc_info:
core.ProxmoxAPI("host", service="pve", backend="LocaL")
assert str(exc_info.value) == "local backend does not support host keyword"
def test_repr_https(self):
prox = core.ProxmoxAPI("host", token_name="name", token_value="value", backend="hTtPs")
assert repr(prox) == "ProxmoxAPI (https backend for https://host:8006/api2/json)"
def test_repr_local(self):
prox = core.ProxmoxAPI(backend="local")
assert repr(prox) == "ProxmoxAPI (local backend for localhost)"
def test_repr_openssh(self):
prox = core.ProxmoxAPI("host", user="user", backend="openssh")
assert repr(prox) == "ProxmoxAPI (openssh backend for host)"
def test_repr_paramiko(self, mock_ssh_client):
prox = core.ProxmoxAPI("host", user="user", backend="ssh_paramiko")
assert repr(prox) == "ProxmoxAPI (ssh_paramiko backend for host)"
def test_get_tokens_https(self, mock_pve):
prox = core.ProxmoxAPI("1.2.3.4:1234", user="user", password="password", backend="https")
ticket, csrf = prox.get_tokens()
assert ticket == "ticket"
assert csrf == "CSRFPreventionToken"
def test_get_tokens_local(self):
prox = core.ProxmoxAPI(service="pve", backend="local")
ticket, csrf = prox.get_tokens()
assert ticket is None
assert csrf is None
def test_init_with_cert(self):
prox = core.ProxmoxAPI(
"host",
token_name="name",
token_value="value",
service="pVe",
backend="hTtPs",
cert="somepem",
)
assert isinstance(prox, core.ProxmoxAPI)
assert isinstance(prox, core.ProxmoxResource)
assert isinstance(prox._backend, https.Backend)
assert prox._backend.auth.service == "PVE"
assert prox._backend.cert == "somepem"
assert prox._store["session"].cert == "somepem"
def test_init_with_cert_key(self):
prox = core.ProxmoxAPI(
"host",
token_name="name",
token_value="value",
service="pVe",
backend="hTtPs",
cert=("somepem", "somekey"),
)
assert isinstance(prox, core.ProxmoxAPI)
assert isinstance(prox, core.ProxmoxResource)
assert isinstance(prox._backend, https.Backend)
assert prox._backend.auth.service == "PVE"
assert prox._backend.cert == ("somepem", "somekey")
assert prox._store["session"].cert == ("somepem", "somekey")
class MockSession:
def request(self, method, url, data=None, params=None):
# store the arguments in the session so they can be tested after the call
self.data = data
self.params = params
self.method = method
self.url = url
if "fail" in url:
r = Response(b"this is the error", 500)
if "reason" in url:
r.reason = "this is the reason"
return r
else:
return Response(b'{"data": {"key": "value"}}', 200)
@pytest.fixture
def mock_private_request():
def mock_request(self, method, data=None, params=None):
return {"self": self, "method": method, "data": data, "params": params}
with mock.patch("proxmoxer.core.ProxmoxResource._request", mock_request):
yield
@pytest.fixture
def mock_resource():
return core.ProxmoxResource(
session=MockSession(), base_url="http://example.com/", serializer=JsonSimpleSerializer()
)
0707010000002C000081A4000000000000000000000001675E3B1A00004F36000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/test_https.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import logging
import re
import sys
import tempfile
from unittest import mock
import pytest
from requests import Request, Response
import proxmoxer as core
from proxmoxer.backends import https
from .api_mock import ( # pylint: disable=unused-import # noqa: F401
PVERegistry,
mock_pve,
)
# pylint: disable=no-self-use
MODULE_LOGGER_NAME = "proxmoxer.backends.https"
class TestHttpsBackend:
"""
Tests for the proxmox.backends.https file.
Only tests the Backend class for correct setting of
variables and selection of auth class.
Other classes are separately tested.
"""
def test_init_no_auth(self):
with pytest.raises(NotImplementedError) as exc_info:
https.Backend("1.2.3.4:1234")
assert str(exc_info.value) == "No valid authentication credentials were supplied"
def test_init_ip4_separate_port(self):
backend = https.Backend("1.2.3.4", port=1234, token_name="")
exp_base_url = "https://1.2.3.4:1234/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_ip4_inline_port(self):
backend = https.Backend("1.2.3.4:1234", token_name="")
exp_base_url = "https://1.2.3.4:1234/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_ip6_separate_port(self):
backend = https.Backend("2001:db8::1:2:3:4", port=1234, token_name="")
exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_ip6_brackets_separate_port(self):
backend = https.Backend("[2001:0db8::1:2:3:4]", port=1234, token_name="")
exp_base_url = "https://[2001:0db8::1:2:3:4]:1234/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_ip6_inline_port(self):
backend = https.Backend("[2001:db8::1:2:3:4]:1234", token_name="")
exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_ip4_no_port(self):
backend = https.Backend("1.2.3.4", token_name="")
exp_base_url = "https://1.2.3.4:8006/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_path_prefix(self):
backend = https.Backend("1.2.3.4:1234", path_prefix="path", token_name="")
exp_base_url = "https://1.2.3.4:1234/path/api2/json"
assert backend.get_base_url() == exp_base_url
def test_init_token_pass(self):
backend = https.Backend("1.2.3.4:1234", token_name="name")
assert isinstance(backend.auth, https.ProxmoxHTTPApiTokenAuth)
def test_init_token_not_supported(self, apply_none_service):
with pytest.raises(NotImplementedError) as exc_info:
https.Backend("1.2.3.4:1234", token_name="name", service="NONE")
assert str(exc_info.value) == "NONE does not support API Token authentication"
def test_init_password_not_supported(self, apply_none_service):
with pytest.raises(NotImplementedError) as exc_info:
https.Backend("1.2.3.4:1234", password="pass", service="NONE")
assert str(exc_info.value) == "NONE does not support password authentication"
def test_get_tokens_api_token(self):
backend = https.Backend("1.2.3.4:1234", token_name="name")
assert backend.get_tokens() == (None, None)
def test_get_tokens_password(self, mock_pve):
backend = https.Backend("1.2.3.4:1234", password="name")
assert ("ticket", "CSRFPreventionToken") == backend.get_tokens()
def test_verify_ssl_token(self):
backend = https.Backend("1.2.3.4:1234", token_name="name")
assert backend.auth.verify_ssl is True
def test_verify_ssl_false_token(self):
backend = https.Backend("1.2.3.4:1234", token_name="name", verify_ssl=False)
assert backend.auth.verify_ssl is False
def test_verify_ssl_password(self, mock_pve):
backend = https.Backend("1.2.3.4:1234", password="name")
assert backend.auth.verify_ssl is True
def test_verify_ssl_false_password(self, mock_pve):
backend = https.Backend("1.2.3.4:1234", password="name", verify_ssl=False)
assert backend.auth.verify_ssl is False
class TestProxmoxHTTPAuthBase:
"""
Tests the ProxmoxHTTPAuthBase class
"""
base_url = PVERegistry.base_url
def test_init_all_args(self):
auth = https.ProxmoxHTTPAuthBase(timeout=1234, service="PMG", verify_ssl=True)
assert auth.timeout == 1234
assert auth.service == "PMG"
assert auth.verify_ssl is True
def test_call(self):
auth = https.ProxmoxHTTPAuthBase()
req = Request("HEAD", self.base_url + "/version").prepare()
resp = auth(req)
assert resp == req
def test_get_cookies(self):
auth = https.ProxmoxHTTPAuthBase()
assert auth.get_cookies().get_dict() == {}
class TestProxmoxHTTPApiTokenAuth:
"""
Tests the ProxmoxHTTPApiTokenAuth class
"""
base_url = PVERegistry.base_url
def test_init_all_args(self):
auth = https.ProxmoxHTTPApiTokenAuth(
"user", "name", "value", service="PMG", timeout=1234, verify_ssl=True
)
assert auth.username == "user"
assert auth.token_name == "name"
assert auth.token_value == "value"
assert auth.service == "PMG"
assert auth.timeout == 1234
assert auth.verify_ssl is True
def test_call_pve(self):
auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PVE")
req = Request("HEAD", self.base_url + "/version").prepare()
resp = auth(req)
assert resp.headers["Authorization"] == "PVEAPIToken=user!name=value"
def test_call_pbs(self):
auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PBS")
req = Request("HEAD", self.base_url + "/version").prepare()
resp = auth(req)
assert resp.headers["Authorization"] == "PBSAPIToken=user!name:value"
class TestProxmoxHTTPAuth:
"""
Tests the ProxmoxHTTPApiTokenAuth class
"""
base_url = PVERegistry.base_url
# pylint: disable=redefined-outer-name
def test_init_all_args(self, mock_pve):
auth = https.ProxmoxHTTPAuth(
"otp",
"password",
otp="otp",
base_url=self.base_url,
service="PMG",
timeout=1234,
verify_ssl=True,
)
assert auth.username == "otp"
assert auth.pve_auth_ticket == "ticket"
assert auth.csrf_prevention_token == "CSRFPreventionToken"
assert auth.service == "PMG"
assert auth.timeout == 1234
assert auth.verify_ssl is True
def test_ticket_renewal(self, mock_pve):
auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url)
auth(Request("HEAD", self.base_url + "/version").prepare())
# check starting auth tokens
assert auth.pve_auth_ticket == "ticket"
assert auth.csrf_prevention_token == "CSRFPreventionToken"
auth.renew_age = 0 # force renewing ticket now
auth(Request("GET", self.base_url + "/version").prepare())
# check renewed auth tokens
assert auth.pve_auth_ticket == "new_ticket"
assert auth.csrf_prevention_token == "CSRFPreventionToken_2"
def test_get_cookies(self, mock_pve):
auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url, service="PVE")
assert auth.get_cookies().get_dict() == {"PVEAuthCookie": "ticket"}
def test_auth_failure(self, mock_pve):
with pytest.raises(core.AuthenticationError) as exc_info:
https.ProxmoxHTTPAuth("bad_auth", "", base_url=self.base_url)
assert (
str(exc_info.value)
== f"Couldn't authenticate user: bad_auth to {self.base_url}/access/ticket"
)
assert (
repr(exc_info.value)
== f'AuthenticationError("Couldn\'t authenticate user: bad_auth to {self.base_url}/access/ticket")'
)
def test_auth_otp(self, mock_pve):
https.ProxmoxHTTPAuth(
"otp", "password", base_url=self.base_url, otp="123456", service="PVE"
)
def test_auth_otp_missing(self, mock_pve):
with pytest.raises(core.AuthenticationError) as exc_info:
https.ProxmoxHTTPAuth("otp", "password", base_url=self.base_url, service="PVE")
assert (
str(exc_info.value)
== "Couldn't authenticate user: missing Two Factor Authentication (TFA)"
)
assert (
repr(exc_info.value)
== 'AuthenticationError("Couldn\'t authenticate user: missing Two Factor Authentication (TFA)")'
)
class TestProxmoxHttpSession:
"""
Tests the ProxmoxHttpSession class
"""
base_url = PVERegistry.base_url
_session = https.Backend("1.2.3.4", token_name="").get_session()
def test_request_basic(self, mock_pve):
resp = self._session.request("GET", self.base_url + "/fake/echo")
content = resp.json()
assert self._session.cert is None
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert content["body"] is None
assert content["headers"]["accept"] == https.JsonSerializer().get_accept_types()
def test_request_data(self, mock_pve):
resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"})
content = resp.json()
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert content["body"] == "key=value"
assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"
def test_request_monitor_command_list(self, mock_pve):
resp = self._session.request(
"GET",
self.base_url + "/nodes/node_name/qemu/100/monitor",
data={"command": ["info", "block"]},
)
assert resp.status_code == 400
def test_request_exec_command_list(self, mock_pve):
resp = self._session.request(
"GET",
self.base_url + "/nodes/node_name/qemu/100/agent/exec",
data={"command": ["echo", "hello", "world"]},
)
content = resp.json()
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec"
assert content["body"] == "command=echo&command=hello&command=world"
assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"
def test_request_monitor_command_string(self, mock_pve):
resp = self._session.request(
"GET",
self.base_url + "/nodes/node_name/qemu/100/monitor",
data={"command": "echo hello world"},
)
content = resp.json()
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/monitor"
assert content["body"] == "command=echo+hello+world"
assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"
def test_request_exec_command_string(self, mock_pve):
resp = self._session.request(
"GET",
self.base_url + "/nodes/node_name/qemu/100/agent/exec",
data={"command": "echo hello world"},
)
content = resp.json()
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec"
assert content["body"] == "command=echo&command=hello&command=world"
assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"
def test_request_file(self, mock_pve):
size = 10
content = {}
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj})
content = resp.json()
# decode multipart file
body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
m = re.match(body_regex, content["body"])
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert m is not None # content matches multipart for the created file
assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]
def test_request_streaming(self, toolbelt_on_off, caplog, mock_pve):
caplog.set_level(logging.INFO, logger=MODULE_LOGGER_NAME)
size = https.STREAMING_SIZE_THRESHOLD + 1
content = {}
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj})
content = resp.json()
# decode multipart file
body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
m = re.match(body_regex, content["body"])
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert m is not None # content matches multipart for the created file
assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]
if not toolbelt_on_off:
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.INFO,
"Installing 'requests_toolbelt' will decrease memory used during upload",
)
]
def test_request_large_file(self, shrink_thresholds, toolbelt_on_off, caplog, mock_pve):
size = https.SSL_OVERFLOW_THRESHOLD + 1
content = {}
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
if toolbelt_on_off:
resp = self._session.request(
"GET", self.base_url + "/fake/echo", data={"iso": f_obj}
)
content = resp.json()
# decode multipart file
body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
m = re.match(body_regex, content["body"])
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert m is not None # content matches multipart for the created file
assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]
else:
# forcing an ImportError
with pytest.raises(OverflowError) as exc_info:
resp = self._session.request(
"GET", self.base_url + "/fake/echo", data={"iso": f_obj}
)
assert str(exc_info.value) == "Unable to upload a payload larger than 2 GiB"
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.WARNING,
"Install 'requests_toolbelt' to add support for files larger than 2GiB",
)
]
def test_request_filename(self, mock_pve):
resp = self._session.request(
"GET",
self.base_url + "/fake/echo",
files={"file1": "content"},
serializer=https.JsonSerializer,
)
content = resp.json()
# decode multipart file
body_regex = '--([0-9a-f]*)\r\nContent-Disposition: form-data; name="file1"; filename="file1"\r\n\r\ncontent\r\n--\\1--\r\n'
m = re.match(body_regex, content["body"])
assert content["method"] == "GET"
assert content["url"] == self.base_url + "/fake/echo"
assert m is not None # content matches multipart for the created file
assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]
# pylint: disable=protected-access
class TestJsonSerializer:
_serializer = https.JsonSerializer()
def test_get_accept_types(self):
ctypes = "application/json, application/x-javascript, text/javascript, text/x-javascript, text/x-json"
assert ctypes == self._serializer.get_accept_types()
def test_loads_pass(self):
input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}'
exp_output = {"key1": "value1", "key2": "value2"}
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads(response)
assert act_output == exp_output
def test_loads_not_json(self):
input_str = "There was an error with the request"
exp_output = {"errors": b"There was an error with the request"}
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads(response)
assert act_output == exp_output
def test_loads_not_unicode(self):
input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80'
exp_output = {"errors": input_str.encode("utf-8")}
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads(response)
assert act_output == exp_output
def test_loads_errors_pass(self):
input_str = (
'{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}'
)
exp_output = ["missing required param 1", "missing required param 2"]
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads_errors(response)
assert act_output == exp_output
def test_loads_errors_not_json(self):
input_str = (
'{"data": {} "errors": ["missing required param 1", "missing required param 2"]}'
)
exp_output = {
"errors": b'{"data": {} "errors": ["missing required param 1", "missing required param 2"]}'
}
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads_errors(response)
assert act_output == exp_output
def test_loads_errors_not_unicode(self):
input_str = (
'{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}\x80'
)
exp_output = {"errors": input_str.encode("utf-8")}
response = Response()
response._content = input_str.encode("utf-8")
act_output = self._serializer.loads_errors(response)
assert act_output == exp_output
@pytest.fixture(params=(False, True))
def toolbelt_on_off(request, monkeypatch):
"""
runs test twice, once with importing of 'requests_toolbelt' to be allowed
and one with it disabled. Returns True if module is available, False if blocked.
"""
if not request.param:
# ran once with requests_toolbelt available and once with it removed
monkeypatch.setitem(sys.modules, "requests_toolbelt", None)
return request.param
@pytest.fixture
def shrink_thresholds():
with mock.patch("proxmoxer.backends.https.STREAMING_SIZE_THRESHOLD", 100), mock.patch(
"proxmoxer.backends.https.SSL_OVERFLOW_THRESHOLD", 1000
):
yield
@pytest.fixture
def apply_none_service():
serv = {
"NONE": {
"supported_backends": [],
"supported_https_auths": [],
"default_port": 1234,
}
}
with mock.patch("proxmoxer.core.SERVICES", serv), mock.patch(
"proxmoxer.backends.https.SERVICES", serv
):
yield
0707010000002D000081A4000000000000000000000001675E3B1A0000099F000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/tests/test_https_helpers.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import tempfile
from proxmoxer.backends import https
class TestGetFileSize:
"""
Tests for the get_file_size() function within proxmoxer.backends.https
"""
def test_empty(self):
with tempfile.TemporaryFile("w+b") as f_obj:
assert https.get_file_size(f_obj) == 0
def test_small(self):
size = 100
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
assert https.get_file_size(f_obj) == size
def test_large(self):
size = 10 * 1024 * 1024 # 10 MB
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
assert https.get_file_size(f_obj) == size
def test_half_seek(self):
size = 200
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(int(size / 2))
assert https.get_file_size(f_obj) == size
def test_full_seek(self):
size = 200
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(size)
assert https.get_file_size(f_obj) == size
class TestGetFileSizePartial:
"""
Tests for the get_file_size_partial() function within proxmoxer.backends.https
"""
def test_empty(self):
with tempfile.TemporaryFile("w+b") as f_obj:
assert https.get_file_size_partial(f_obj) == 0
def test_small(self):
size = 100
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
assert https.get_file_size_partial(f_obj) == size
def test_large(self):
size = 10 * 1024 * 1024 # 10 MB
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
assert https.get_file_size_partial(f_obj) == size
def test_half_seek(self):
size = 200
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(int(size / 2))
assert https.get_file_size_partial(f_obj) == size / 2
def test_full_seek(self):
size = 200
with tempfile.TemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * size)
f_obj.seek(size)
assert https.get_file_size_partial(f_obj) == 0
0707010000002E000081A4000000000000000000000001675E3B1A00000FC3000000000000000000000000000000000000002600000000proxmoxer-2.2.0/tests/test_imports.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import logging
import sys
from importlib import reload
import pytest
def test_missing_requests(requests_off, caplog):
with pytest.raises(SystemExit) as exit_exp:
import proxmoxer.backends.https as test_https
# force re-importing of the module with `requests` gone so the validation is triggered
reload(test_https)
assert exit_exp.value.code == 1
assert caplog.record_tuples == [
(
"proxmoxer.backends.https",
logging.ERROR,
"Chosen backend requires 'requests' module\n",
)
]
def test_missing_requests_tools_files(requests_off, caplog):
with pytest.raises(SystemExit) as exit_exp:
import proxmoxer.tools.files as test_files
# force re-importing of the module with `requests` gone so the validation is triggered
reload(test_files)
assert exit_exp.value.code == 1
assert caplog.record_tuples == [
(
"proxmoxer.tools.files",
logging.ERROR,
"Files tools requires 'requests' module\n",
)
]
def test_missing_openssh_wrapper(openssh_off, caplog):
with pytest.raises(SystemExit) as exit_exp:
import proxmoxer.backends.openssh as test_openssh
# force re-importing of the module with `openssh_wrapper` gone so the validation is triggered
reload(test_openssh)
assert exit_exp.value.code == 1
assert caplog.record_tuples == [
(
"proxmoxer.backends.openssh",
logging.ERROR,
"Chosen backend requires 'openssh_wrapper' module\n",
)
]
def test_missing_paramiko_off(paramiko_off, caplog):
with pytest.raises(SystemExit) as exit_exp:
import proxmoxer.backends.ssh_paramiko as ssh_paramiko
# force re-importing of the module with `ssh_paramiko` gone so the validation is triggered
reload(ssh_paramiko)
assert exit_exp.value.code == 1
assert caplog.record_tuples == [
(
"proxmoxer.backends.ssh_paramiko",
logging.ERROR,
"Chosen backend requires 'paramiko' module\n",
)
]
class TestCommandBase:
def test_join_empty(self, shlex_join_on_off):
from proxmoxer.backends import command_base
reload(command_base)
arr = []
assert command_base.shell_join(arr) == ""
def test_join_single(self, shlex_join_on_off):
from proxmoxer.backends import command_base
reload(command_base)
arr = ["echo"]
assert command_base.shell_join(arr) == "echo"
def test_join_multiple(self, shlex_join_on_off):
from proxmoxer.backends import command_base
reload(command_base)
arr = ["echo", "test"]
assert command_base.shell_join(arr) == "echo test"
def test_join_complex(self, shlex_join_on_off):
from proxmoxer.backends import command_base
reload(command_base)
arr = ["echo", 'hello "world"']
assert command_base.shell_join(arr) == "echo 'hello \"world\"'"
@pytest.fixture()
def requests_off(monkeypatch):
return monkeypatch.setitem(sys.modules, "requests", None)
@pytest.fixture()
def openssh_off(monkeypatch):
return monkeypatch.setitem(sys.modules, "openssh_wrapper", None)
@pytest.fixture()
def paramiko_off(monkeypatch):
return monkeypatch.setitem(sys.modules, "paramiko", None)
@pytest.fixture(params=(False, True))
def shlex_join_on_off(request, monkeypatch):
"""
runs test twice, once with importing of 'shlex.join' to be allowed
and one with it disabled. Returns True if module is available, False if blocked.
"""
if not request.param:
# ran once with shlex available and once with it removed
if getattr(sys.modules["shlex"], "join", None):
monkeypatch.delattr(sys.modules["shlex"], "join")
# else join already does not exist (py < 3.8)
return request.param
0707010000002F000081A4000000000000000000000001675E3B1A00000635000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/test_local.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import tempfile
from proxmoxer.backends import local
# pylint: disable=no-self-use
class TestLocalBackend:
def test_init(self):
back = local.Backend()
assert isinstance(back.session, local.LocalSession)
assert back.target == "localhost"
class TestLocalSession:
_session = local.LocalSession()
def test_upload_file_obj(self):
size = 100
with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile(
"rb"
) as dest_obj:
f_obj.write(b"a" * size)
f_obj.seek(0)
self._session.upload_file_obj(f_obj, dest_obj.name)
# reset file cursor to start of file after copy
f_obj.seek(0)
assert f_obj.read() == dest_obj.read()
def test_upload_file_obj_end(self):
size = 100
with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile(
"rb"
) as dest_obj:
f_obj.write(b"a" * size)
# do not seek to start of file before copy
self._session.upload_file_obj(f_obj, dest_obj.name)
assert b"" == dest_obj.read()
def test_exec(self):
cmd = [
"python3",
"-c",
'import sys; sys.stdout.write("stdout content"); sys.stderr.write("stderr content")',
]
stdout, stderr = self._session._exec(cmd)
assert stdout == "stdout content"
assert stderr == "stderr content"
07070100000030000081A4000000000000000000000001675E3B1A000009F1000000000000000000000000000000000000002600000000proxmoxer-2.2.0/tests/test_openssh.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import tempfile
from unittest import mock
import openssh_wrapper
import pytest
from proxmoxer.backends import openssh
# pylint: disable=no-self-use
class TestOpenSSHBackend:
def test_init(self):
back = openssh.Backend("host", "user")
assert isinstance(back.session, openssh.OpenSSHSession)
assert back.session.host == "host"
assert back.session.user == "user"
assert back.target == "host"
class TestOpenSSHSession:
_session = openssh.OpenSSHSession("host", "user")
def test_init_all_args(self):
with tempfile.NamedTemporaryFile("r") as conf_obj, tempfile.NamedTemporaryFile(
"r"
) as ident_obj:
sess = openssh.OpenSSHSession(
"host",
"user",
config_file=conf_obj.name,
port=123,
identity_file=ident_obj.name,
forward_ssh_agent=True,
)
assert sess.host == "host"
assert sess.user == "user"
assert sess.config_file == conf_obj.name
assert sess.port == 123
assert sess.identity_file == ident_obj.name
assert sess.forward_ssh_agent is True
def test_exec(self, mock_session):
cmd = [
"echo",
"hello",
"world",
]
stdout, stderr = mock_session._exec(cmd)
assert stdout == "stdout content"
assert stderr == "stderr content"
mock_session.ssh_client.run.assert_called_once_with(
"echo hello world",
forward_ssh_agent=True,
)
def test_upload_file_obj(self, mock_session):
with tempfile.NamedTemporaryFile("r") as f_obj:
mock_session.upload_file_obj(f_obj, "/tmp/file")
mock_session.ssh_client.scp.assert_called_once_with(
(f_obj,),
target="/tmp/file",
)
@pytest.fixture
def mock_session():
with mock.patch("proxmoxer.backends.openssh.OpenSSHSession._connect", _get_mock_ssh_conn):
yield openssh.OpenSSHSession("host", "user", forward_ssh_agent=True)
def _get_mock_ssh_conn(_):
ssh_conn = mock.Mock(spec=openssh_wrapper.SSHConnection)
ssh_conn.run = mock.Mock(
# spec=openssh_wrapper.SSHConnection.run,
return_value=mock.Mock(stdout="stdout content", stderr="stderr content"),
)
ssh_conn.scp = mock.Mock()
return ssh_conn
07070100000031000081A4000000000000000000000001675E3B1A0000144B000000000000000000000000000000000000002700000000proxmoxer-2.2.0/tests/test_paramiko.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import os.path
import tempfile
from unittest import mock
import pytest
from proxmoxer.backends import ssh_paramiko
# pylint: disable=no-self-use
class TestParamikoBackend:
def test_init(self, mock_connect):
back = ssh_paramiko.Backend("host", "user")
assert isinstance(back.session, ssh_paramiko.SshParamikoSession)
assert back.session.host == "host"
assert back.session.user == "user"
assert back.target == "host"
class TestSshParamikoSession:
def test_init_all_args(self, mock_connect):
sess = ssh_paramiko.SshParamikoSession(
"host", "user", password="password", private_key_file="/tmp/key_file", port=1234
)
assert sess.host == "host"
assert sess.user == "user"
assert sess.password == "password"
assert sess.private_key_file == "/tmp/key_file"
assert sess.port == 1234
assert sess.ssh_client == mock_connect()
def test_connect_basic(self, mock_ssh_client):
import paramiko
sess = ssh_paramiko.SshParamikoSession("host", "user", password="password", port=1234)
sess.ssh_client.connect.assert_called_once_with(
"host",
username="user",
allow_agent=False,
look_for_keys=True,
key_filename=None,
password="password",
timeout=5,
port=1234,
)
policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)
def test_connect_key_file(self, mock_ssh_client):
import paramiko
sess = ssh_paramiko.SshParamikoSession(
"host", "user", password="password", private_key_file="/tmp/key_file", port=1234
)
sess.ssh_client.connect.assert_called_once_with(
"host",
username="user",
allow_agent=False,
look_for_keys=True,
key_filename="/tmp/key_file",
password="password",
timeout=5,
port=1234,
)
policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)
def test_connect_key_file_user(self, mock_ssh_client):
import paramiko
sess = ssh_paramiko.SshParamikoSession(
"host", "user", password="password", private_key_file="~/key_file", port=1234
)
sess.ssh_client.connect.assert_called_once_with(
"host",
username="user",
allow_agent=False,
look_for_keys=True,
key_filename=os.path.expanduser("~") + "/key_file",
password="password",
timeout=5,
port=1234,
)
policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)
def test_exec(self, mock_ssh_client):
mock_client, mock_session, _ = mock_ssh_client
sess = ssh_paramiko.SshParamikoSession("host", "user")
sess.ssh_client = mock_client
stdout, stderr = sess._exec(["echo", "hello", "world"])
assert stdout == "stdout contents"
assert stderr == "stderr contents"
mock_session.exec_command.assert_called_once_with("echo hello world")
def test_upload_file_obj(self, mock_ssh_client):
mock_client, _, mock_sftp = mock_ssh_client
sess = ssh_paramiko.SshParamikoSession("host", "user")
sess.ssh_client = mock_client
with tempfile.NamedTemporaryFile("r") as f_obj:
sess.upload_file_obj(f_obj, "/tmp/file")
mock_sftp.putfo.assert_called_once_with(f_obj, "/tmp/file")
mock_sftp.close.assert_called_once_with()
@pytest.fixture
def mock_connect():
m = mock.Mock(spec=ssh_paramiko.SshParamikoSession._connect)
with mock.patch(
"proxmoxer.backends.ssh_paramiko.SshParamikoSession._connect",
m,
):
yield m
@pytest.fixture
def mock_ssh_client():
# pylint: disable=import-outside-toplevel
from paramiko import SFTPClient, SSHClient, Transport, channel
mock_client = mock.Mock(spec=SSHClient)
mock_transport = mock.Mock(spec=Transport)
mock_channel = mock.Mock(spec=channel.Channel)
mock_sftp = mock.Mock(spec=SFTPClient)
# mock the return streams from the SSH connection
mock_stdout = mock.Mock(spec=channel.ChannelFile)
mock_stderr = mock.Mock(spec=channel.ChannelStderrFile)
mock_stdout.read.return_value = b"stdout contents"
mock_stderr.read.return_value = b"stderr contents"
mock_channel.makefile.return_value = mock_stdout
mock_channel.makefile_stderr.return_value = mock_stderr
mock_transport.open_session.return_value = mock_channel
mock_client.get_transport.return_value = mock_transport
mock_client.open_sftp.return_value = mock_sftp
with mock.patch("paramiko.SSHClient", mock_client):
yield (mock_client, mock_channel, mock_sftp)
07070100000032000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/tests/tools07070100000033000081A4000000000000000000000001675E3B1A0000005C000000000000000000000000000000000000002800000000proxmoxer-2.2.0/tests/tools/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
07070100000034000081A4000000000000000000000001675E3B1A00003525000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/tests/tools/test_files.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"
import logging
import tempfile
from unittest import mock
import pytest
from proxmoxer import ProxmoxAPI, core
from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums
from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
from ..files_mock import ( # pylint: disable=unused-import # noqa: F401
mock_files,
mock_files_and_pve,
)
MODULE_LOGGER_NAME = "proxmoxer.tools.files"
class TestChecksumInfo:
def test_basic(self):
info = ChecksumInfo("name", 123)
assert info.name == "name"
assert info.hex_size == 123
def test_str(self):
info = ChecksumInfo("name", 123)
assert str(info) == "name"
def test_repr(self):
info = ChecksumInfo("name", 123)
assert repr(info) == "name (123 digits)"
class TestGetChecksum:
def test_get_checksum_from_sibling_file_success(self, mock_files):
url = "https://sub.domain.tld/sibling/file.iso"
exp_hash = "this_is_the_hash"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso")
assert res1 == exp_hash
assert res2 == exp_hash
def test_get_checksum_from_sibling_file_fail(self, mock_files):
url = "https://sub.domain.tld/sibling/missing.iso"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
res2 = Files._get_checksum_from_sibling_file(
url, checksum_info=info, filename="missing.iso"
)
assert res1 is None
assert res2 is None
def test_get_checksum_from_extension_success(self, mock_files):
url = "https://sub.domain.tld/extension/file.iso"
exp_hash = "this_is_the_hash"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_extension(url, checksum_info=info)
res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso")
assert res1 == exp_hash
assert res2 == exp_hash
def test_get_checksum_from_extension_fail(self, mock_files):
url = "https://sub.domain.tld/extension/missing.iso"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_extension(url, checksum_info=info)
res2 = Files._get_checksum_from_extension(
url, checksum_info=info, filename="connectionerror.iso"
)
res3 = Files._get_checksum_from_extension(
url, checksum_info=info, filename="readtimeout.iso"
)
assert res1 is None
assert res2 is None
assert res3 is None
def test_get_checksum_from_extension_upper_success(self, mock_files):
url = "https://sub.domain.tld/upper/file.iso"
exp_hash = "this_is_the_hash"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
res2 = Files._get_checksum_from_extension_upper(
url, checksum_info=info, filename="file.iso"
)
assert res1 == exp_hash
assert res2 == exp_hash
def test_get_checksum_from_extension_upper_fail(self, mock_files):
url = "https://sub.domain.tld/upper/missing.iso"
info = ChecksumInfo("testing", 16)
res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
res2 = Files._get_checksum_from_extension_upper(
url, checksum_info=info, filename="missing.iso"
)
assert res1 is None
assert res2 is None
def test_get_checksums_from_file_url_all_checksums(self, mock_files):
base_url = "https://sub.domain.tld/checksums/file.iso"
full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
for types_enum in SupportedChecksums:
checksum_info = types_enum.value
data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info)
assert data[0] == full_checksum_string[0 : checksum_info.hex_size]
assert data[1] == checksum_info
def test_get_checksums_from_file_url_missing(self, mock_files):
url = "https://sub.domain.tld/missing.iso"
data = Files.get_checksums_from_file_url(url)
assert data[0] is None
assert data[1] is None
class TestFiles:
prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
def test_init_basic(self):
f = Files(self.prox, "node1", "storage1")
assert f._prox == self.prox
assert f._node == "node1"
assert f._storage == "storage1"
def test_repr(self):
f = Files(self.prox, "node1", "storage1")
assert (
repr(f)
== "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))"
)
def test_get_file_info_pass(self, mock_pve):
f = Files(self.prox, "node1", "storage1")
info = f.get_file_info("https://sub.domain.tld/file.iso")
assert info["filename"] == "file.iso"
assert info["mimetype"] == "application/x-iso9660-image"
assert info["size"] == 123456
def test_get_file_info_fail(self, mock_pve):
f = Files(self.prox, "node1", "storage1")
info = f.get_file_info("https://sub.domain.tld/invalid.iso")
assert info is None
class TestFilesDownload:
prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
f = Files(prox, "node1", "storage1")
def test_download_discover_checksum(self, mock_files_and_pve, caplog):
status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso")
# this is the default "done" task mock information
assert status == {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "OK",
"pid": 1044989,
"id": "110",
"node": "node1",
}
assert caplog.record_tuples == []
def test_download_no_blocking(self, mock_files_and_pve, caplog):
status = self.f.download_file_to_storage(
"https://sub.domain.tld/checksums/file.iso", blocking_status=False
)
# this is the default "done" task mock information
assert status == {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "OK",
"pid": 1044989,
"id": "110",
"node": "node1",
}
assert caplog.record_tuples == []
def test_download_no_discover_checksum(self, mock_files_and_pve, caplog):
caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME)
status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso")
# this is the default "stopped" task mock information
assert status == {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "OK",
"pid": 1044989,
"id": "110",
"node": "node1",
}
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.WARNING,
"Unable to discover checksum. Will not do checksum validation",
),
]
def test_uneven_checksum(self, caplog, mock_files_and_pve):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf")
assert status is None
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.ERROR,
"Must pass both checksum and checksum_type or leave both None for auto-discovery",
),
]
def test_uneven_checksum_type(self, caplog, mock_files_and_pve):
caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
status = self.f.download_file_to_storage(
"https://sub.domain.tld/file.iso", checksum_type="asdf"
)
assert status is None
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.ERROR,
"Must pass both checksum and checksum_type or leave both None for auto-discovery",
),
]
def test_get_file_info_missing(self, mock_pve):
f = Files(self.prox, "node1", "storage1")
info = f.get_file_info("https://sub.domain.tld/missing.iso")
assert info is None
def test_get_file_info_non_iso(self, mock_pve):
f = Files(self.prox, "node1", "storage1")
info = f.get_file_info("https://sub.domain.tld/index.html")
assert info["filename"] == "index.html"
assert info["mimetype"] == "text/html"
class TestFilesUpload:
prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
f = Files(prox, "node1", "storage1")
def test_upload_no_file(self, mock_files_and_pve, caplog):
status = self.f.upload_local_file_to_storage("/does-not-exist.iso")
assert status is None
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.ERROR,
'"/does-not-exist.iso" does not exist or is not a file',
),
]
def test_upload_dir(self, mock_files_and_pve, caplog):
with tempfile.TemporaryDirectory() as tmp_dir:
status = self.f.upload_local_file_to_storage(tmp_dir)
assert status is None
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.ERROR,
f'"{tmp_dir}" does not exist or is not a file',
),
]
def test_upload_empty_file(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("rb") as f_obj:
status = self.f.upload_local_file_to_storage(filename=f_obj.name)
assert status is not None
assert caplog.record_tuples == []
def test_upload_non_empty_file(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("w+b") as f_obj:
f_obj.write(b"a" * 100)
f_obj.seek(0)
status = self.f.upload_local_file_to_storage(filename=f_obj.name)
assert status is not None
assert caplog.record_tuples == []
def test_upload_no_checksum(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("rb") as f_obj:
status = self.f.upload_local_file_to_storage(
filename=f_obj.name, do_checksum_check=False
)
assert status is not None
assert caplog.record_tuples == []
def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums):
with tempfile.NamedTemporaryFile("rb") as f_obj:
status = self.f.upload_local_file_to_storage(filename=f_obj.name)
assert status is not None
assert caplog.record_tuples == [
(
MODULE_LOGGER_NAME,
logging.WARNING,
"There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation",
)
]
def test_upload_non_blocking(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("rb") as f_obj:
status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False)
assert status is not None
assert caplog.record_tuples == []
def test_upload_proxmox_error(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("rb") as f_obj:
f_copy = Files(self.f._prox, self.f._node, "missing")
with pytest.raises(core.ResourceException) as exc_info:
f_copy.upload_local_file_to_storage(filename=f_obj.name)
assert exc_info.value.status_code == 500
assert exc_info.value.status_message == "Internal Server Error"
# assert exc_info.value.content == "storage 'missing' does not exist"
def test_upload_io_error(self, mock_files_and_pve, caplog):
with tempfile.NamedTemporaryFile("rb") as f_obj:
mo = mock.mock_open()
mo.side_effect = IOError("ERROR MESSAGE")
with mock.patch("builtins.open", mo):
status = self.f.upload_local_file_to_storage(filename=f_obj.name)
assert status is None
assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")]
@pytest.fixture
def apply_no_checksums():
with mock.patch("hashlib.algorithms_available", set()):
yield
07070100000035000081A4000000000000000000000001675E3B1A000022B1000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/tests/tools/test_tasks.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import logging
import pytest
from proxmoxer import ProxmoxAPI
from proxmoxer.tools import Tasks
from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
class TestBlockingStatus:
def test_basic(self, mocked_prox, caplog):
caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
status = Tasks.blocking_status(
mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done"
)
assert status == {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "OK",
"pid": 1044989,
"id": "110",
"node": "node1",
}
assert caplog.record_tuples == [
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'',
),
]
def test_zeroed(self, mocked_prox, caplog):
caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
status = Tasks.blocking_status(
mocked_prox, "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment"
)
assert status == {
"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
"node": "node",
"pid": 0,
"pstart": 0,
"starttime": 0,
"type": "task",
"id": "id",
"user": "root@pam",
"status": "stopped",
"exitstatus": "OK",
}
assert caplog.record_tuples == [
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node/tasks/UPID:node:00000000:00000000:00000000:task:id:root@pam:comment/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK"}}\'',
),
]
def test_killed(self, mocked_prox, caplog):
caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
status = Tasks.blocking_status(
mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped"
)
assert status == {
"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
"starttime": 1661825068,
"user": "root@pam",
"type": "vzdump",
"pstart": 284768076,
"status": "stopped",
"exitstatus": "interrupted by signal",
"pid": 1044989,
"id": "110",
"node": "node1",
}
assert caplog.record_tuples == [
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1"}}\'',
),
]
def test_timeout(self, mocked_prox, caplog):
caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
status = Tasks.blocking_status(
mocked_prox,
"UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
timeout=0.021,
polling_interval=0.01,
)
assert status is None
assert caplog.record_tuples == [
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
),
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
),
(
"proxmoxer.core",
20,
"GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
),
(
"proxmoxer.core",
10,
'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
),
]
class TestDecodeUpid:
def test_basic(self):
upid = "UPID:node:000CFC5C:03E8D0C3:6194806C:aptupdate::root@pam:"
decoded = Tasks.decode_upid(upid)
assert decoded["upid"] == upid
assert decoded["node"] == "node"
assert decoded["pid"] == 851036
assert decoded["pstart"] == 65589443
assert decoded["starttime"] == 1637122156
assert decoded["type"] == "aptupdate"
assert decoded["id"] == ""
assert decoded["user"] == "root@pam"
assert decoded["comment"] == ""
def test_all_values(self):
upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:local"
decoded = Tasks.decode_upid(upid)
assert decoded["upid"] == upid
assert decoded["node"] == "node1"
assert decoded["pid"] == 851962
assert decoded["pstart"] == 65597267
assert decoded["starttime"] == 1637122234
assert decoded["type"] == "vzdump"
assert decoded["id"] == "103"
assert decoded["user"] == "root@pam"
assert decoded["comment"] == "local"
def test_invalid_length(self):
upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam"
with pytest.raises(AssertionError) as exc_info:
Tasks.decode_upid(upid)
assert str(exc_info.value) == "UPID is not in the correct format"
def test_invalid_start(self):
upid = "ASDF:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:"
with pytest.raises(AssertionError) as exc_info:
Tasks.decode_upid(upid)
assert str(exc_info.value) == "UPID is not in the correct format"
class TestDecodeLog:
def test_basic(self):
log_list = [{"n": 1, "t": "client connection: 127.0.0.1:49608"}, {"t": "TASK OK", "n": 2}]
log_str = Tasks.decode_log(log_list)
assert log_str == "client connection: 127.0.0.1:49608\nTASK OK"
def test_empty(self):
log_list = []
log_str = Tasks.decode_log(log_list)
assert log_str == ""
def test_unordered(self):
log_list = [{"n": 3, "t": "third"}, {"t": "first", "n": 1}, {"t": "second", "n": 2}]
log_str = Tasks.decode_log(log_list)
assert log_str == "first\nsecond\nthird"
@pytest.fixture
def mocked_prox(mock_pve):
return ProxmoxAPI("1.2.3.4:1234", user="user", password="password")
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!384 blocks