File proxmoxer-2.2.0.obscpio of Package python-proxmoxer

07070100000000000081A4000000000000000000000001675E3B1A00000058000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.banditskips:
  - B105
  - B106

assert_used:
  skips:
    - '*/*_test.py'
    - '*/test_*.py'
07070100000001000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001E00000000proxmoxer-2.2.0/.devcontainer07070100000002000081A4000000000000000000000001675E3B1A0000033B000000000000000000000000000000000000002900000000proxmoxer-2.2.0/.devcontainer/Dockerfile# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.166.1/containers/python-3/.devcontainer/base.Dockerfile

# [Choice] Python version: 3, 3.11, 3.10, 3.9, 3.8, 3.7, 3.6
ARG VARIANT="3"
FROM mcr.microsoft.com/vscode/devcontainers/python:${VARIANT}

# [Optional] If your pip requirements rarely change, uncomment this section to add them to the image.
COPY test_requirements.txt dev_requirements.txt /tmp/pip-tmp/
RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/test_requirements.txt -r /tmp/pip-tmp/dev_requirements.txt \
   && rm -rf /tmp/pip-tmp

# [Optional] Uncomment this section to install additional OS packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
#     && apt-get -y install --no-install-recommends <your-package-list-here>
07070100000003000081A4000000000000000000000001675E3B1A000007B7000000000000000000000000000000000000003000000000proxmoxer-2.2.0/.devcontainer/devcontainer.json// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
  "name": "Proxmoxer Development",
  "build": {
    "dockerfile": "Dockerfile",
    "context": "..",
    "args": {
      // Update 'VARIANT' to pick a Python version: 3, 3.6, 3.7, 3.8, 3.9, 3.10, 3.11
      "VARIANT": "3.8"
    }
  },
  // Set *default* container specific settings.json values on container create.
  "customizations": {
    "vscode": {
      "settings": {
        "terminal.integrated.shell.linux": "/bin/bash",
        "python.pythonPath": "/usr/local/bin/python",
        "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
        "python.formatting.blackPath": "/usr/local/py-utils/bin/black",
        "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
        "python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
        "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
        "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
        "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
        "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
        "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint"
      },
      // Add the IDs of extensions you want installed when the container is created.
      "extensions": [
        "mhutchie.git-graph",
        "ms-python.python",
        "njpwerner.autodocstring",
        "ryanluker.vscode-coverage-gutters",
        "streetsidesoftware.code-spell-checker"
      ]
    }
  },
  // Use 'forwardPorts' to make a list of ports inside the container available locally.
  // "forwardPorts": [],
  // Run commands to prepare the container for use
  "postCreateCommand": ".devcontainer/setup.sh",
  // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
  "remoteUser": "vscode"
}
07070100000004000081ED000000000000000000000001675E3B1A00000184000000000000000000000000000000000000002700000000proxmoxer-2.2.0/.devcontainer/setup.sh#!/bin/bash

# install proxmoxer as an editable package
pip3 install -e .
rm -rf proxmoxer.egg-info/

# hide the mass-formatting commits from git blames
git config blame.ignorerevsfile .git-blame-ignore-revs

# install the git hook for pre-commit
pre-commit install

# run pre-commit on a simple file to ensure it downloads all needed tools
pre-commit run --files .pre-commit-config.yaml
07070100000005000081A4000000000000000000000001675E3B1A0000008C000000000000000000000000000000000000002700000000proxmoxer-2.2.0/.git-blame-ignore-revs# use with `git config blame.ignorerevsfile .git-blame-ignore-revs`

# Format code base with Black
7a976de985fc7b71fdf31d3161f223eeaada38da
07070100000006000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.github07070100000007000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002200000000proxmoxer-2.2.0/.github/workflows07070100000008000081A4000000000000000000000001675E3B1A00000786000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/.github/workflows/ci.yamlname: CI

on:
  push:

  pull_request:

  # Allows you to run this workflow manually from the Actions tab
  workflow_dispatch:

jobs:
  unit-test:
    continue-on-error: ${{ github.repository == 'proxmoxer/proxmoxer' }}
    runs-on: ubuntu-latest
    strategy:
      fail-fast: false
      matrix:
        python-version:
          - "3.8"
          - "3.9"
          - "3.10"
          - "3.11"
          - "3.12"

    steps:
      - name: Checkout
        uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}

      - name: Cache PIP packages
        uses: actions/cache@v3
        with:
          path: ~/.cache/pip
          key: ${{ runner.os }}-pip-python${{ matrix.python-version }}-${{ hashFiles('*requirements.txt') }}
          restore-keys: |
            ${{ runner.os }}-pip-python${{ matrix.python-version }}-
            ${{ runner.os }}-pip-

      - name: Install pip Packages
        run: pip install -r test_requirements.txt

      - name: Install Self as Package
        run: pip install .

      - name: Run Tests
        run: pytest -v --cov tests/

      - name: Run pre-commit lint/format checks
        uses: pre-commit/action@v3.0.0

      - name: Upload coverage data to coveralls.io
        if: github.repository == 'proxmoxer/proxmoxer'
        run: coveralls --service=github
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          COVERALLS_FLAG_NAME: Unit Test (${{ matrix.python-version }})
          COVERALLS_PARALLEL: true


  complete:
    name: Finalize Coveralls Report
    if: github.repository == 'proxmoxer/proxmoxer'
    needs: unit-test
    runs-on: ubuntu-latest
    steps:
      - name: Coveralls Finished
        uses: coverallsapp/github-action@1.1.3
        with:
          parallel-finished: true
          github-token: ${{ secrets.GITHUB_TOKEN }}
07070100000009000081A4000000000000000000000001675E3B1A00000841000000000000000000000000000000000000001B00000000proxmoxer-2.2.0/.gitignore# IDE files
.idea
*.code-workspace

coverage.*

# generated files
README.txt

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/
0707010000000A000081A4000000000000000000000001675E3B1A0000060B000000000000000000000000000000000000002800000000proxmoxer-2.2.0/.pre-commit-config.yamlrepos:
  ###### FORMATTING ######
  - repo: https://github.com/psf/black-pre-commit-mirror
    rev: 23.11.0
    hooks:
      - id: black
        language_version: python3 # Should be a command that runs python3.6+

  - repo: https://github.com/PyCQA/isort
    rev: 5.12.0
    hooks:
      - id: isort
        name: isort (python)
      - id: isort
        name: isort (pyi)
        types: [pyi]

  ###### LINTING ######
  - repo: https://github.com/PyCQA/bandit
    rev: 1.7.5
    hooks:
      - id: bandit
        args: ["--configfile", ".bandit", "--baseline", "tests/known_issues.json"]

  - repo: https://github.com/PyCQA/flake8
    rev: 6.1.0
    hooks:
      - id: flake8
      # any flake8 plugins must be included in the hook venv
      # additional_dependencies: [flake8-docstrings]

  # - repo: https://github.com/PyCQA/pylint
  #   rev: v2.8.2
  #   hooks:
  #     - id: pylint

  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: check-case-conflict
      - id: check-symlinks
      - id: destroyed-symlinks
      - id: check-merge-conflict
      - id: check-docstring-first
      - id: mixed-line-ending
        args: [--fix=no]

  - repo: https://github.com/asottile/blacken-docs
    rev: 1.16.0
    hooks:
    -   id: blacken-docs
        additional_dependencies: [black==23.11.0]

  - repo: https://github.com/pre-commit/pygrep-hooks
    rev: v1.10.0
    hooks:
      - id: python-no-eval
      - id: rst-backticks
      - id: rst-directive-colons
      - id: rst-inline-touching-normal
0707010000000B000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001800000000proxmoxer-2.2.0/.vscode0707010000000C000081A4000000000000000000000001675E3B1A00000419000000000000000000000000000000000000002600000000proxmoxer-2.2.0/.vscode/settings.json{
  "python.linting.enabled": true,
  "python.formatting.provider": "black",
  "editor.formatOnPaste": false,
  "python.linting.flake8Enabled": true,
  "python.linting.pylintEnabled": true,
  "python.linting.banditEnabled": true,
  "python.linting.banditArgs": [
    "--baseline",
    "tests/known_issues.json",
    "--configfile",
    ".bandit"
  ],
  "[python]": {
    "editor.codeActionsOnSave": {
      "source.organizeImports": "explicit"
    }
  },
  "cSpell.words": [
    "auths",
    "Butovich",
    "caplog",
    "cpus",
    "Oleg",
    "onboot",
    "ostemplate",
    "Paramiko",
    "proxmoxer",
    "pvesh",
    "resps",
    "rtype",
    "sess",
    "toolbelt",
    "vmid",
    "vztmpl"
  ],
  "autoDocstring.docstringFormat": "sphinx",
  "autoDocstring.startOnNewLine": true,
  "python.testing.pytestArgs": [
    "--cov",
    "--cov-report",
    "xml:coverage.xml",
    "tests/",
  ],
  "python.testing.unittestEnabled": false,
  "python.testing.pytestEnabled": true,
  "coverage-gutters.coverageFileNames": [
    "coverage.xml"
  ],
}
0707010000000D000081A4000000000000000000000001675E3B1A00000AEA000000000000000000000000000000000000002300000000proxmoxer-2.2.0/.vscode/tasks.json{
  // See https://go.microsoft.com/fwlink/?LinkId=733558
  // for the documentation about the tasks.json format
  "version": "2.0.0",
  "tasks": [
    {
      "label": "Run Tests (with coverage file)",
      "type": "shell",
      "command": "pytest -v --cov --cov-report xml:coverage.xml tests/",
      "problemMatcher": [],
      "icon": {
        "id": "beaker",
        "color": "terminal.ansiGreen"
      },
      "runOptions": {
        "instanceLimit": 1
      },
      "group": {
        "kind": "test",
        "isDefault": true
      },
      "presentation": {
        "echo": true,
        "reveal": "silent",
        "focus": false,
        "panel": "dedicated",
        "showReuseMessage": false,
        "clear": true
      },
    },
    {
      "label": "Run Tests (with coverage)",
      "type": "shell",
      "command": "pytest --cov tests/",
      "problemMatcher": [],
      "icon": {
        "id": "beaker",
        "color": "terminal.ansiGreen"
      },
      "runOptions": {
        "instanceLimit": 1
      },
      "group": {
        "kind": "test",
        "isDefault": false
      },
      "presentation": {
        "echo": true,
        "reveal": "always",
        "focus": false,
        "panel": "dedicated",
        "showReuseMessage": true,
        "clear": true
      },
    },
    {
      "label": "Run Tests",
      "type": "shell",
      "command": "pytest -v tests/",
      "problemMatcher": [],
      "icon": {
        "id": "beaker",
      },
      "group": {
        "kind": "test"
      },
      "presentation": {
        "echo": true,
        "reveal": "always",
        "focus": false,
        "panel": "dedicated",
        "showReuseMessage": false,
        "clear": true
      },
    },
    {
      "label": "Update bandit baseline",
      "type": "shell",
      "command": "bandit --configfile .bandit -f json -r tests/ proxmoxer/ >| tests/known_issues.json",
      "problemMatcher": [],
      "runOptions": {
        "instanceLimit": 1
      },
      "group": {
        "kind": "none"
      },
      "icon": {
        "id": "bookmark"
      },
      "presentation": {
        "echo": true,
        "reveal": "never",
        "focus": false,
        "panel": "shared",
        "showReuseMessage": false,
        "clear": false
      },
    },
    {
      "label": "Clean Cache/tmp files",
      "type": "shell",
      "command": "rm -rf ./.mypy_cache/ ./.pytest_cache/ ./dist/ ./coverage.xml ./.coverage README.txt",
      "problemMatcher": [],
      "group": {
        "kind": "none"
      },
      "icon": {
        "id": "trashcan"
      },
      "presentation": {
        "echo": true,
        "reveal": "never",
        "focus": false,
        "panel": "shared",
        "showReuseMessage": false,
        "clear": false
      },
    }
  ]
}
0707010000000E000081A4000000000000000000000001675E3B1A0000240B000000000000000000000000000000000000001D00000000proxmoxer-2.2.0/CHANGELOG.md## 2.2.0 (2024-12-13)

* Bugfix (local,openssh,paramiko): Remove IP/hostname from command path ([Andrea Dainese](https://github.com/dainok), [John Hollowell](https://github.com/jhollowe))
* Addition (https): Allow passing certificate for TLS verification ([gdelaneync](https://github.com/gdelaneync))
* Bugfix (local,openssh,paramiko): Prevent a returned task ID (UPID) from throwing an error ([Adam Dorsey](https://github.com/asdorsey), [John Hollowell](https://github.com/jhollowe))
* Improvement (local,openssh,paramiko): Attempt to encode binary payloads as UTF-8 before sending/JSON-ing ([Adam Dorsey](https://github.com/asdorsey))

## 2.1.0 (2024-08-10)

* Improvement (docs): Update Readme with updated example ([Rob Wolinski](https://github.com/trekie86))
* Addition (tools): Added Files tools ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Add repr to some classes and add to tests ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Correct metadata to match supported Python versions (3.6+) ([Alexei Znamensky](https://github.com/russoz))
* Bugfix (https): Fix BytesWarning when logging response status/content ([Walter Doekes](https://github.com/wdoekes))
* Improvement (meta): Update devcontainer to modern unified schema ([John Hollowell](https://github.com/jhollowe))
* Improvement (meta): Add 3.12 to CI matrix, remove 3.7 testing ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Fix improper splitting of non-exec QEMU commands ([John Hollowell](https://github.com/jhollowe))

## 2.0.1 (2022-12-19)

* Bugfix (https): properly pass verify_ssl all the way to the backend auth ([Dominik Rimpf](https://github.com/domrim))

## 2.0.0 (2022-11-27)

* Improvement (all): Convert testing framework to use pytest ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Remove Python 2.x support (minimum version of 3.7) ([John Hollowell](https://github.com/jhollowe))
* Improvement (all): Refactor code to Python 3 standards ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Remove None values from request data and params ([Kristian Heljas](https://github.com/kristianheljas))
* Addition (tools): Added Task tools ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Allow specifying resource_id as 0 ([John Bergvall](https://github.com/johnbergvall))
* Improvement (all): Remove ProxmoxResourceBase ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Add platform detection before using shlex functions ([Kevin Boyd](https://github.com/r3d07))
* Improvement (https): Added `path_prefix` argument which is appended after the root of the URL (before `api2/`) ([John Hollowell](https://github.com/jhollowe))

### Breaking Changes
* `ProxmoxResourceBase` removed
* `proxmoxer.backends.https.AuthenticationError` moved to `proxmoxer.AuthenticationError`
* Removed `ProxmoxHTTPTicketAuth` and its arguments `auth_token` and `csrf_token`
* keyword arguments to backends order changed (should not affect users specifying arguments by name)

## 1.3.1 (2022-05-14)

* Bugfix (all): fix error handling for APIs that don't give a dict in the response ([Alex Wuillaume](https://github.com/wuillaumea))

## 1.3.0 (2022-03-13)

* Addition (local): Added `local` backend for running directly on Proxmox hosts. ([Markus Reiter](https://github.com/reitermarkus))
* Bugfix (all): properly parse command string sent to QEMU guest agent ([John Hollowell](https://github.com/jhollowe))
* Improvement (command_base): Refactor code to have a unified CLI backend base for `openssh`, `ssh_paramiko`, and `local` backends ([Markus Reiter](https://github.com/reitermarkus))
* Improvement (https): Support IPv6 addresses ([Daviddcc](https://github.com/dcasier))
* Improvement: Move CI to GitHub actions from Travis.ci ([John Hollowell](https://github.com/jhollowe))
* Improvement: Cleanup documentation and move to dedicated site ([John Hollowell](https://github.com/jhollowe))
* Improvement: Add `pre-commit` hooks for formatting and linting and format all code ([John Hollowell](https://github.com/jhollowe))

## 1.2.0 (2021-10-07)
* Addition (https): Added OTP code support to authentication ([John Hollowell](https://github.com/jhollowe))
* Addition (https): Added support for large file uploads using requests_toolbelt module ([John Hollowell](https://github.com/jhollowe))
* Addition (all): Added support for Proxmox Mail Gateway (PMG) and Proxmox Backup Server (PBS) with parameter validation ([Gabriel Cardoso de Faria](https://github.com/gabrielcardoso21), [John Hollowell](https://github.com/jhollowe))
* Addition (all): Added detailed information to ResourceException ([mihailstoynov](https://github.com/mihailstoynov))
* Bugfix (base_ssh): Resolved issue with values containing spaces by encapsulating values in quotes ([mihailstoynov](https://github.com/mihailstoynov))
* Bugfix (all): Resolved issue with using get/post/push/delete on a base ProxmoxAPI object ([John Hollowell](https://github.com/jhollowe))
* Bugfix (all): Added support for responses which are not JSON ([John Hollowell](https://github.com/jhollowe))
* Improvement: Added and updated documentation ([Ananias Filho](https://github.com/ananiasfilho), [Thomas Baag](https://github.com/b2ag))
* Improvement: Tests are now not installed when using PIP ([Ville Skyttä](https://github.com/scop))
* Addition: Devcontainer definition now available to make development easier ([John Hollowell](https://github.com/jhollowe))

## 1.1.1 (2020-06-23)
* Bugfix (https): correctly renew ticket in the session, not just the auth ([John Hollowell](https://github.com/jhollowe))

## 1.1.0 (2020-05-22)
* Addition (https): Added API Token authentication ([John Hollowell](https://github.com/jhollowe))
* Improvement (https): user/password authentication refreshes ticket to prevent expiration ([CompileNix](https://github.com/compilenix), [John Hollowell](https://github.com/jhollowe))
* Bugfix (ssh_paramiko): Handle empty stderr from ssh connections ([morph027](https://github.com/morph027))
* DEPRECATED (https): using ``auth_token`` and ``csrf_token`` (ProxmoxHTTPTicketAuth) is now deprecated. Either pass the ``auth_token`` as the ``password`` or use the API Tokens.

## 1.0.4 (2020-01-24)
* Improvement (https): Added timeout to authentication (James Lin)
* Improvement (https): Handle AnyEvent::HTTP status codes gracefully (Georges Martin)
* Improvement (https): Advanced error message with error code >=400 ([ssi444](https://github.com/ssi444))
* Bugfix (ssh): Fix pvesh output format for version > 5.3 ([timansky](https://github.com/timansky))
* Transferred development to proxmoxer organization

## 1.0.3 (2018-09-10)
* Improvement (https): Added option to specify port in hostname parameter ([pvanagtmaal](https://github.com/pvanagtmaal))
* Improvement: Added stderr to the Response content ([Jérôme Schneider](https://github.com/merinos))
* Bugfix (ssh_paramiko): Paramiko python3: stdout and stderr must be a str not bytes ([Jérôme Schneider](https://github.com/merinos))
* New lxc example in documentation ([Geert Stappers](https://github.com/stappersg))

## 1.0.2 (2017-12-02)
* Tarball repackaged with tests

## 1.0.1 (2017-12-02)
* LICENSE file now included in tarball
* Added verify_ssl parameter to ProxmoxHTTPAuth ([Walter Doekes](https://github.com/wdoekes))

## 1.0.0 (2017-11-12)
* Update Proxmoxer readme ([Emmanuel Kasper](https://github.com/EmmanuelKasper))
* Display the reason of API calls errors ([Emmanuel Kasper](https://github.com/EmmanuelKasper), [kantsdog](https://github.com/kantsdog))
* Filter for ssh response code ([Chris Plock](https://github.com/chrisplo))

## 0.2.5 (2017-02-12)
* Adding sudo to execute CLI with paramiko ssh backend ([Jason Meridth](https://github.com/jmeridth))
* Proxmoxer/backends/ssh_paramiko: improve file upload ([Jérôme Schneider](https://github.com/merinos))

## 0.2.4 (2016-05-02)
* Removed newline in tmp_filename string ([Jérôme Schneider](https://github.com/merinos))
* Fix to avoid module reloading ([jklang](https://github.com/jklang))

## 0.2.3 (2016-01-20)
* Minor typo fix ([Srinivas Sakhamuri](https://github.com/srsakhamuri))

## 0.2.2 (2016-01-19)
* Adding sudo to execute pvesh CLI in openssh backend ([Wei Tie](https://github.com/TieWei), [Srinivas Sakhamuri](https://github.com/srsakhamuri))
* Add support to specify an identity file for ssh connections ([Srinivas Sakhamuri](https://github.com/srsakhamuri))

## 0.2.1 (2015-05-02)
* fix for python 3.4 ([kokuev](https://github.com/kokuev))

## 0.2.0 (2015-03-21)
* Https will now raise AuthenticationError when appropriate. ([scap1784](https://github.com/scap1784))
* Preliminary python 3 compatibility. ([wdoekes](https://github.com/wdoekes))
* Additional example. ([wdoekes](https://github.com/wdoekes))

## 0.1.7 (2014-11-16)
* Added ignore of "InsecureRequestWarning: Unverified HTTPS request is being made..." warning while using https (requests) backend.

## 0.1.4 (2013-06-01)
* Added logging
* Added openssh backend
* Tests are reorganized

## 0.1.3 (2013-05-30)
* Added next tests
* Bugfixes

## 0.1.2 (2013-05-27)

* Added first tests
* Added support for travis and coveralls
* Bugfixes

## 0.1.1 (2013-05-13)
* Initial try.
0707010000000F000081A4000000000000000000000001675E3B1A00000431000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/LICENSE.txtThe MIT License

Copyright (c) 2013 Oleg Butovich

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.07070100000010000081A4000000000000000000000001675E3B1A00000077000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/MANIFEST.ininclude LICENSE.txt
include README.txt
include README.rst
include CHANGELOG.md
global-exclude *.orig *.pyc *.log *.swp
07070100000011000081A4000000000000000000000001675E3B1A00001004000000000000000000000000000000000000001B00000000proxmoxer-2.2.0/README.rst================================================
Proxmoxer: A Python wrapper for Proxmox REST API
================================================

master branch:  |master_build_status| |master_coverage_status| |pypi_version| |pypi_downloads|

develop branch: |develop_build_status| |develop_coverage_status|


Proxmoxer is a python wrapper around the `Proxmox REST API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_.
It currently supports the Proxmox services of Proxmox Virtual Environment (PVE), Proxmox Mail Gateway (PMG), and Proxmox Backup Server (PBS).

It was inspired by slumber, but it is dedicated only to Proxmox. It allows not only REST API use over HTTPS, but
the same api over ssh and pvesh utility.

Like `Proxmoxia <https://github.com/baseblack/Proxmoxia>`_, it dynamically creates attributes which responds to the
attributes you've attempted to reach.

Full Documentation is available at https://proxmoxer.github.io/docs/
--------------------------------------------------------------------

Migrating to version 2
......................

Full instructions for the minimal steps needed to update to version 2 can be found in `Migration Docs <https://proxmoxer.github.io/docs/latest/v1_migration/>`_.

Installation
............

.. code-block:: bash

    pip install proxmoxer

To use the 'https' backend, install requests

.. code-block:: bash

    pip install requests

To use the 'ssh_paramiko' backend, install paramiko

.. code-block:: bash

    pip install paramiko

To use the 'openssh' backend, install openssh_wrapper

.. code-block:: bash

    pip install openssh_wrapper


Short usage information
.......................

The first thing to do is import the proxmoxer library and create ProxmoxAPI instance.

.. code-block:: python

    from proxmoxer import ProxmoxAPI

    proxmox = ProxmoxAPI(
        "proxmox_host", user="admin@pam", password="secret_word", verify_ssl=False
    )

This will connect by default to PVE through the 'https' backend.

**Note: ensure you have the required libraries (listed above) for the connection method you are using**

Queries are exposed via the access methods **get**, **post**, **put** and **delete**. For convenience two
synonyms are available: **create** for **post**, and **set** for **put**.

Using the paths from the `PVE API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_, you can create
API calls using the access methods above.

.. code-block:: pycon

    >>> for node in proxmox.nodes.get():
    ...     for vm in proxmox.nodes(node["node"]).qemu.get():
    ...         print(f"{vm['vmid']}. {vm['name']} => {vm['status']}")
    ...

    141. puppet-2.london.example.com => running
    101. munki.london.example.com => running
    102. redmine.london.example.com => running
    140. dns-1.london.example.com => running
    126. ns-3.london.example.com => running
    113. rabbitmq.london.example.com => running


See Changelog in `CHANGELOG.md <https://github.com/proxmoxer/proxmoxer/blob/develop/CHANGELOG.md>`_
...................................................................................................

.. |master_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=master
    :target: https://github.com/proxmoxer/proxmoxer/actions

.. |master_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/master
    :target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=master

.. |develop_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=develop
    :target: https://github.com/proxmoxer/proxmoxer/actions

.. |develop_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/develop
    :target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=develop

.. |pypi_version| image:: https://img.shields.io/pypi/v/proxmoxer.svg
    :target: https://pypi.python.org/pypi/proxmoxer

.. |pypi_downloads| image:: https://img.shields.io/pypi/dm/proxmoxer.svg
    :target: https://pypi.python.org/pypi/proxmoxer
07070100000012000081A4000000000000000000000001675E3B1A0000001C000000000000000000000000000000000000002500000000proxmoxer-2.2.0/dev_requirements.txttwine
setuptools
pre-commit
07070100000013000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001A00000000proxmoxer-2.2.0/proxmoxer07070100000014000081A4000000000000000000000001675E3B1A00000092000000000000000000000000000000000000002600000000proxmoxer-2.2.0/proxmoxer/__init__.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2024"
__version__ = "2.2.0"
__license__ = "MIT"

from .core import *  # noqa
07070100000015000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002300000000proxmoxer-2.2.0/proxmoxer/backends07070100000016000081A4000000000000000000000001675E3B1A0000005F000000000000000000000000000000000000002F00000000proxmoxer-2.2.0/proxmoxer/backends/__init__.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
07070100000017000081A4000000000000000000000001675E3B1A000014DB000000000000000000000000000000000000003300000000proxmoxer-2.2.0/proxmoxer/backends/command_base.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"


import json
import logging
import platform
import re
from itertools import chain
from shlex import split as shell_split

from proxmoxer.core import SERVICES

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)


try:
    from shlex import join

    def shell_join(args):
        return join(args)

except ImportError:
    from shlex import quote

    def shell_join(args):
        return " ".join([quote(arg) for arg in args])


class Response:
    def __init__(self, content, status_code):
        self.status_code = status_code
        self.content = content
        self.text = str(content)
        self.headers = {"content-type": "application/json"}

    def __str__(self):
        return f"Response ({self.status_code}) {self.content}"


class CommandBaseSession:
    def __init__(
        self,
        service="PVE",
        timeout=5,
        sudo=False,
    ):
        self.service = service.lower()
        self.timeout = timeout
        self.sudo = sudo

    def _exec(self, cmd):
        raise NotImplementedError()

    # noinspection PyUnusedLocal
    def request(self, method, url, data=None, params=None, headers=None):
        method = method.lower()
        data = data or {}
        params = params or {}
        url = url.strip()

        cmd = {"post": "create", "put": "set"}.get(method, method)

        # separate out qemu exec commands to split into multiple argument pairs (issue#89)
        data_command = None
        if "/agent/exec" in url:
            data_command = data.get("command")
            if data_command is not None:
                del data["command"]

        # for 'upload' call some workaround
        tmp_filename = ""
        if url.endswith("upload"):
            # copy file to temporary location on proxmox host
            tmp_filename, _ = self._exec(
                [
                    "python3",
                    "-c",
                    "import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
                ]
            )
            tmp_filename = str(tmp_filename, "utf-8")
            self.upload_file_obj(data["filename"], tmp_filename)
            data["filename"] = data["filename"].name
            data["tmpfilename"] = tmp_filename

        command = [f"{self.service}sh", cmd, url]
        # convert the options dict into a 2-tuple with the key formatted as a flag
        option_pairs = []
        for k, v in chain(data.items(), params.items()):
            try:
                option_pairs.append((f"-{k}", str(v, "utf-8")))
            except TypeError:
                option_pairs.append((f"-{k}", str(v)))
        # add back in all the command arguments as their own pairs
        if data_command is not None:
            if isinstance(data_command, list):
                command_arr = data_command
            elif "Windows" not in platform.platform():
                command_arr = shell_split(data_command)
            for arg in command_arr:
                option_pairs.append(("-command", arg))
        # expand the list of 2-tuples into a flat list
        options = [val for pair in option_pairs for val in pair]
        additional_options = SERVICES[self.service.upper()].get("cli_additional_options", [])
        full_cmd = command + options + additional_options

        if self.sudo:
            full_cmd = ["sudo"] + full_cmd

        stdout, stderr = self._exec(full_cmd)

        def is_http_status_string(s):
            return re.match(r"\d\d\d [a-zA-Z]", str(s))

        if stderr:
            # assume if we got a task ID that the request was successful
            task_id_pattern = re.compile(
                r"UPID:[\w-]+:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:\w+:[\w\._-]+:[\w\.@_-]+:\w*"
            )
            if task_id_pattern.search(str(stdout)) or task_id_pattern.search(str(stderr)):
                status_code = 200
            else:
                # sometimes contains extra text like 'trying to acquire lock...OK'
                status_code = next(
                    (
                        int(line.split()[0])
                        for line in stderr.splitlines()
                        if is_http_status_string(line)
                    ),
                    500,
                )
        else:
            status_code = 200
        if stdout:
            return Response(stdout, status_code)
        return Response(stderr, status_code)

    def upload_file_obj(self, file_obj, remote_path):
        raise NotImplementedError()


class JsonSimpleSerializer:
    def loads(self, response):
        try:
            return json.loads(response.content)
        except (UnicodeDecodeError, ValueError):
            return {"errors": response.content}

    def loads_errors(self, response):
        try:
            return json.loads(response.text).get("errors")
        except (UnicodeDecodeError, ValueError):
            return {"errors": response.content}


class CommandBaseBackend:
    def __init__(self):
        self.session = None
        self.target = None

    def get_session(self):
        return self.session

    def get_base_url(self):
        return ""

    def get_serializer(self):
        return JsonSimpleSerializer()
07070100000018000081A4000000000000000000000001675E3B1A00002EEB000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/backends/https.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"


import io
import json
import logging
import os
import platform
import sys
import time
from shlex import split as shell_split

from proxmoxer.core import SERVICES, AuthenticationError, config_failure

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)

STREAMING_SIZE_THRESHOLD = 10 * 1024 * 1024  # 10 MiB
SSL_OVERFLOW_THRESHOLD = 2147483135  # 2^31 - 1 - 512

try:
    import requests
    from requests.auth import AuthBase
    from requests.cookies import cookiejar_from_dict

    # Disable warnings about using untrusted TLS
    requests.packages.urllib3.disable_warnings()
except ImportError:
    logger.error("Chosen backend requires 'requests' module\n")
    sys.exit(1)


class ProxmoxHTTPAuthBase(AuthBase):
    def __call__(self, req):
        return req

    def get_cookies(self):
        return cookiejar_from_dict({})

    def get_tokens(self):
        return None, None

    def __init__(self, timeout=5, service="PVE", verify_ssl=False, cert=None):
        self.timeout = timeout
        self.service = service
        self.verify_ssl = verify_ssl
        self.cert = cert


class ProxmoxHTTPAuth(ProxmoxHTTPAuthBase):
    # number of seconds between renewing access tickets (must be less than 7200 to function correctly)
    # if calls are made less frequently than 2 hrs, using the API token auth is recommended
    renew_age = 3600

    def __init__(self, username, password, otp=None, base_url="", **kwargs):
        super().__init__(**kwargs)
        self.base_url = base_url
        self.username = username
        self.pve_auth_ticket = ""

        self._get_new_tokens(password=password, otp=otp)

    def _get_new_tokens(self, password=None, otp=None):
        if password is None:
            # refresh from existing (unexpired) ticket
            password = self.pve_auth_ticket

        data = {"username": self.username, "password": password}
        if otp:
            data["otp"] = otp

        response_data = requests.post(
            self.base_url + "/access/ticket",
            verify=self.verify_ssl,
            timeout=self.timeout,
            data=data,
            cert=self.cert,
        ).json()["data"]
        if response_data is None:
            raise AuthenticationError(
                "Couldn't authenticate user: {0} to {1}".format(
                    self.username, self.base_url + "/access/ticket"
                )
            )
        if response_data.get("NeedTFA") is not None:
            raise AuthenticationError(
                "Couldn't authenticate user: missing Two Factor Authentication (TFA)"
            )

        self.birth_time = time.monotonic()
        self.pve_auth_ticket = response_data["ticket"]
        self.csrf_prevention_token = response_data["CSRFPreventionToken"]

    def get_cookies(self):
        return cookiejar_from_dict({self.service + "AuthCookie": self.pve_auth_ticket})

    def get_tokens(self):
        return self.pve_auth_ticket, self.csrf_prevention_token

    def __call__(self, req):
        # refresh ticket if older than `renew_age`
        time_diff = time.monotonic() - self.birth_time
        if time_diff >= self.renew_age:
            logger.debug(f"refreshing ticket (age {time_diff})")
            self._get_new_tokens()

        # only attach CSRF token if needed (reduce interception risk)
        if req.method != "GET":
            req.headers["CSRFPreventionToken"] = self.csrf_prevention_token
        return req


class ProxmoxHTTPApiTokenAuth(ProxmoxHTTPAuthBase):
    def __init__(self, username, token_name, token_value, **kwargs):
        super().__init__(**kwargs)
        self.username = username
        self.token_name = token_name
        self.token_value = token_value

    def __call__(self, req):
        req.headers["Authorization"] = "{0}APIToken={1}!{2}{3}{4}".format(
            self.service,
            self.username,
            self.token_name,
            SERVICES[self.service]["token_separator"],
            self.token_value,
        )
        req.cert = self.cert
        return req


class JsonSerializer:
    content_types = [
        "application/json",
        "application/x-javascript",
        "text/javascript",
        "text/x-javascript",
        "text/x-json",
    ]

    def get_accept_types(self):
        return ", ".join(self.content_types)

    def loads(self, response):
        try:
            return json.loads(response.content.decode("utf-8"))["data"]
        except (UnicodeDecodeError, ValueError):
            return {"errors": response.content}

    def loads_errors(self, response):
        try:
            return json.loads(response.text).get("errors")
        except (UnicodeDecodeError, ValueError):
            return {"errors": response.content}


# pylint:disable=arguments-renamed
class ProxmoxHttpSession(requests.Session):
    def request(
        self,
        method,
        url,
        params=None,
        data=None,
        headers=None,
        cookies=None,
        files=None,
        auth=None,
        timeout=None,
        allow_redirects=True,
        proxies=None,
        hooks=None,
        stream=None,
        verify=None,
        cert=None,
        serializer=None,
    ):
        a = auth or self.auth
        c = cookies or self.cookies

        # set verify flag from auth if request does not have this parameter explicitly
        if verify is None:
            verify = a.verify_ssl

        if timeout is None:
            timeout = a.timeout

        # pull cookies from auth if not present
        if (not c) and a:
            cookies = a.get_cookies()

        # filter out streams
        files = files or {}
        data = data or {}
        total_file_size = 0
        for k, v in data.copy().items():
            # split qemu exec commands for proper parsing by PVE (issue#89)
            if k == "command" and url.endswith("agent/exec"):
                if isinstance(v, list):
                    data[k] = v
                elif "Windows" not in platform.platform():
                    data[k] = shell_split(v)
            if isinstance(v, io.IOBase):
                total_file_size += get_file_size(v)

                # add in filename from file pointer (patch for https://github.com/requests/toolbelt/pull/316)
                # add Content-Type since Proxmox requires it (https://bugzilla.proxmox.com/show_bug.cgi?id=4344)
                files[k] = (requests.utils.guess_filename(v), v, "application/octet-stream")
                del data[k]

        # if there are any large files, send all data and files using streaming multipart encoding
        if total_file_size > STREAMING_SIZE_THRESHOLD:
            try:
                # pylint:disable=import-outside-toplevel
                from requests_toolbelt import MultipartEncoder

                encoder = MultipartEncoder(fields={**data, **files})
                data = encoder
                files = None
                headers = {"Content-Type": encoder.content_type}
            except ImportError:
                # if the files will cause issues with the SSL 2GiB limit (https://bugs.python.org/issue42853#msg384566)
                if total_file_size > SSL_OVERFLOW_THRESHOLD:
                    logger.warning(
                        "Install 'requests_toolbelt' to add support for files larger than 2GiB"
                    )
                    raise OverflowError("Unable to upload a payload larger than 2 GiB")
                else:
                    logger.info(
                        "Installing 'requests_toolbelt' will decrease memory used during upload"
                    )

        return super().request(
            method,
            url,
            params,
            data,
            headers,
            cookies,
            files,
            auth,
            timeout,
            allow_redirects,
            proxies,
            hooks,
            stream,
            verify,
            cert,
        )


class Backend:
    def __init__(
        self,
        host,
        user=None,
        password=None,
        otp=None,
        port=None,
        verify_ssl=True,
        mode="json",
        timeout=5,
        token_name=None,
        token_value=None,
        path_prefix=None,
        service="PVE",
        cert=None,
    ):
        self.cert = cert
        host_port = ""
        if len(host.split(":")) > 2:  # IPv6
            if host.startswith("["):
                if "]:" in host:
                    host, host_port = host.rsplit(":", 1)
            else:
                host = f"[{host}]"
        elif ":" in host:
            host, host_port = host.split(":")
        port = host_port if host_port.isdigit() else port

        # if a port is not specified, use the default port for this service
        if not port:
            port = SERVICES[service]["default_port"]

        self.mode = mode
        if path_prefix is not None:
            self.base_url = f"https://{host}:{port}/{path_prefix}/api2/{mode}"
        else:
            self.base_url = f"https://{host}:{port}/api2/{mode}"

        if token_name is not None:
            if "token" not in SERVICES[service]["supported_https_auths"]:
                config_failure("{} does not support API Token authentication", service)

            self.auth = ProxmoxHTTPApiTokenAuth(
                user,
                token_name,
                token_value,
                verify_ssl=verify_ssl,
                timeout=timeout,
                service=service,
                cert=self.cert,
            )
        elif password is not None:
            if "password" not in SERVICES[service]["supported_https_auths"]:
                config_failure("{} does not support password authentication", service)

            self.auth = ProxmoxHTTPAuth(
                user,
                password,
                otp,
                base_url=self.base_url,
                verify_ssl=verify_ssl,
                timeout=timeout,
                service=service,
                cert=self.cert,
            )
        else:
            config_failure("No valid authentication credentials were supplied")

    def get_session(self):
        session = ProxmoxHttpSession()
        session.cert = self.cert
        session.auth = self.auth
        # cookies are taken from the auth
        session.headers["Connection"] = "keep-alive"
        session.headers["accept"] = self.get_serializer().get_accept_types()
        return session

    def get_base_url(self):
        return self.base_url

    def get_serializer(self):
        assert self.mode == "json"
        return JsonSerializer()

    def get_tokens(self):
        """Return the in-use auth and csrf tokens if using user/password auth."""
        return self.auth.get_tokens()


def get_file_size(file_obj):
    """Returns the number of bytes in the given file object in total
    file cursor remains at the same location as when passed in

    :param fileObj: file object of which the get size
    :type fileObj: file object
    :return: total bytes in file object
    :rtype: int
    """
    # store existing file cursor location
    starting_cursor = file_obj.tell()

    # seek to end of file
    file_obj.seek(0, os.SEEK_END)

    size = file_obj.tell()

    # reset cursor
    file_obj.seek(starting_cursor)

    return size


def get_file_size_partial(file_obj):
    """Returns the number of bytes in the given file object from the current cursor to the end

    :param fileObj: file object of which the get size
    :type fileObj: file object
    :return: remaining bytes in file object
    :rtype: int
    """
    # store existing file cursor location
    starting_cursor = file_obj.tell()

    file_obj.seek(0, os.SEEK_END)

    # get number of byte between where the cursor was set and the end
    size = file_obj.tell() - starting_cursor

    # reset cursor
    file_obj.seek(starting_cursor)

    return size
07070100000019000081A4000000000000000000000001675E3B1A00000307000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/backends/local.py__author__ = "Markus Reiter"
__copyright__ = "(c) Markus Reiter 2022"
__license__ = "MIT"

import shutil
from subprocess import PIPE, Popen

from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession


class LocalSession(CommandBaseSession):
    def _exec(self, cmd):
        proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
        stdout, stderr = proc.communicate(timeout=self.timeout)
        return stdout.decode(), stderr.decode()

    def upload_file_obj(self, file_obj, remote_path):
        with open(remote_path, "wb") as dest_fp:
            shutil.copyfileobj(file_obj, dest_fp)


class Backend(CommandBaseBackend):
    def __init__(self, *args, **kwargs):
        self.session = LocalSession(*args, **kwargs)
        self.target = "localhost"
0707010000001A000081A4000000000000000000000001675E3B1A000006DD000000000000000000000000000000000000002E00000000proxmoxer-2.2.0/proxmoxer/backends/openssh.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"

import logging

from proxmoxer.backends.command_base import (
    CommandBaseBackend,
    CommandBaseSession,
    shell_join,
)

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)

try:
    import openssh_wrapper
except ImportError:
    import sys

    logger.error("Chosen backend requires 'openssh_wrapper' module\n")
    sys.exit(1)


class OpenSSHSession(CommandBaseSession):
    def __init__(
        self,
        host,
        user,
        config_file=None,
        port=22,
        identity_file=None,
        forward_ssh_agent=False,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.host = host
        self.user = user
        self.config_file = config_file
        self.port = port
        self.forward_ssh_agent = forward_ssh_agent
        self.identity_file = identity_file

        self.ssh_client = self._connect()

    def _connect(self):
        return openssh_wrapper.SSHConnection(
            self.host,
            login=self.user,
            port=str(self.port),  # openssh_wrapper complains if this is an int
            configfile=self.config_file,
            identity_file=self.identity_file,
            timeout=self.timeout,
        )

    def _exec(self, cmd):
        ret = self.ssh_client.run(shell_join(cmd), forward_ssh_agent=self.forward_ssh_agent)
        return ret.stdout, ret.stderr

    def upload_file_obj(self, file_obj, remote_path):
        self.ssh_client.scp((file_obj,), target=remote_path)


class Backend(CommandBaseBackend):
    def __init__(self, *args, **kwargs):
        self.session = OpenSSHSession(*args, **kwargs)
        self.target = self.session.host
0707010000001B000081A4000000000000000000000001675E3B1A00000845000000000000000000000000000000000000003300000000proxmoxer-2.2.0/proxmoxer/backends/ssh_paramiko.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"

# spell-checker:ignore putfo

import logging
import os

from proxmoxer.backends.command_base import (
    CommandBaseBackend,
    CommandBaseSession,
    shell_join,
)

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)

try:
    import paramiko
except ImportError:
    import sys

    logger.error("Chosen backend requires 'paramiko' module\n")
    sys.exit(1)


class SshParamikoSession(CommandBaseSession):
    def __init__(self, host, user, password=None, private_key_file=None, port=22, **kwargs):
        super().__init__(**kwargs)
        self.host = host
        self.user = user
        self.password = password
        self.private_key_file = private_key_file
        self.port = port

        self.ssh_client = self._connect()

    def _connect(self):
        ssh_client = paramiko.SSHClient()
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        if self.private_key_file:
            key_filename = os.path.expanduser(self.private_key_file)
        else:
            key_filename = None

        ssh_client.connect(
            self.host,
            username=self.user,
            allow_agent=(not self.password),
            look_for_keys=True,
            key_filename=key_filename,
            password=self.password,
            timeout=self.timeout,
            port=self.port,
        )

        return ssh_client

    def _exec(self, cmd):
        session = self.ssh_client.get_transport().open_session()
        session.exec_command(shell_join(cmd))
        stdout = session.makefile("rb", -1).read().decode()
        stderr = session.makefile_stderr("rb", -1).read().decode()
        return stdout, stderr

    def upload_file_obj(self, file_obj, remote_path):
        sftp = self.ssh_client.open_sftp()
        sftp.putfo(file_obj, remote_path)
        sftp.close()


class Backend(CommandBaseBackend):
    def __init__(self, *args, **kwargs):
        self.session = SshParamikoSession(*args, **kwargs)
        self.target = self.session.host
0707010000001C000081A4000000000000000000000001675E3B1A00001EC6000000000000000000000000000000000000002200000000proxmoxer-2.2.0/proxmoxer/core.py__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"

# spell-checker:ignore urlunsplit

import importlib
import logging
import posixpath
from http import client as httplib
from urllib import parse as urlparse

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)


# https://metacpan.org/pod/AnyEvent::HTTP
ANYEVENT_HTTP_STATUS_CODES = {
    595: "Errors during connection establishment, proxy handshake",
    596: "Errors during TLS negotiation, request sending and header processing",
    597: "Errors during body receiving or processing",
    598: "User aborted request via on_header or on_body",
    599: "Other, usually nonretryable, errors (garbled URL etc.)",
}

SERVICES = {
    "PVE": {
        "supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
        "supported_https_auths": ["password", "token"],
        "default_port": 8006,
        "token_separator": "=",
        "cli_additional_options": ["--output-format", "json"],
    },
    "PMG": {
        "supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
        "supported_https_auths": ["password"],
        "default_port": 8006,
    },
    "PBS": {
        "supported_backends": ["https"],
        "supported_https_auths": ["password", "token"],
        "default_port": 8007,
        "token_separator": ":",
    },
}


def config_failure(message, *args):
    raise NotImplementedError(message.format(*args))


class ResourceException(Exception):
    """
    An Exception thrown when an Proxmox API call failed
    """

    def __init__(self, status_code, status_message, content, errors=None):
        """
        Create a new ResourceException

        :param status_code: The HTTP status code (faked by non-HTTP backends)
        :type status_code: int
        :param status_message: HTTP Status code (faked by non-HTTP backends)
        :type status_message: str
        :param content: Extended information on what went wrong
        :type content: str
        :param errors: Any specific errors that were encountered (converted to string), defaults to None
        :type errors: Optional[object], optional
        """
        self.status_code = status_code
        self.status_message = status_message
        self.content = content
        self.errors = errors
        if errors is not None:
            content += f" - {errors}"
        message = f"{status_code} {status_message}: {content}".strip()
        super().__init__(message)


class AuthenticationError(Exception):
    pass


class ProxmoxResource:
    def __init__(self, **kwargs):
        self._store = kwargs

    def __repr__(self):
        return f"ProxmoxResource ({self._store.get('base_url')})"

    def __getattr__(self, item):
        if item.startswith("_"):
            raise AttributeError(item)

        kwargs = self._store.copy()
        kwargs["base_url"] = self.url_join(self._store["base_url"], item)

        return ProxmoxResource(**kwargs)

    def url_join(self, base, *args):
        scheme, netloc, path, query, fragment = urlparse.urlsplit(base)
        path = path if len(path) else "/"
        path = posixpath.join(path, *[str(x) for x in args])
        return urlparse.urlunsplit([scheme, netloc, path, query, fragment])

    def __call__(self, resource_id=None):
        if resource_id in (None, ""):
            return self

        if isinstance(resource_id, (bytes, str)):
            resource_id = resource_id.split("/")
        elif not isinstance(resource_id, (tuple, list)):
            resource_id = [str(resource_id)]

        kwargs = self._store.copy()
        if resource_id is not None:
            kwargs["base_url"] = self.url_join(self._store["base_url"], *resource_id)

        return ProxmoxResource(**kwargs)

    def _request(self, method, data=None, params=None):
        url = self._store["base_url"]
        if data:
            logger.info(f"{method} {url} {data}")
        else:
            logger.info(f"{method} {url}")

        # passing None values to pvesh command breaks it, let's remove them just as requests library does
        # helpful when dealing with function default values higher in the chain, no need to clean up in multiple places
        if params:
            # remove keys that are set to None
            params_none_keys = [k for (k, v) in params.items() if v is None]
            for key in params_none_keys:
                del params[key]

        if data:
            # remove keys that are set to None
            data_none_keys = [k for (k, v) in data.items() if v is None]
            for key in data_none_keys:
                del data[key]

        resp = self._store["session"].request(method, url, data=data, params=params)
        logger.debug(f"Status code: {resp.status_code}, output: {resp.content!r}")

        if resp.status_code >= 400:
            if hasattr(resp, "reason"):
                raise ResourceException(
                    resp.status_code,
                    httplib.responses.get(
                        resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
                    ),
                    resp.reason,
                    errors=(self._store["serializer"].loads_errors(resp)),
                )
            else:
                raise ResourceException(
                    resp.status_code,
                    httplib.responses.get(
                        resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
                    ),
                    resp.text,
                )
        elif 200 <= resp.status_code <= 299:
            return self._store["serializer"].loads(resp)

    def get(self, *args, **params):
        return self(args)._request("GET", params=params)

    def post(self, *args, **data):
        return self(args)._request("POST", data=data)

    def put(self, *args, **data):
        return self(args)._request("PUT", data=data)

    def delete(self, *args, **params):
        return self(args)._request("DELETE", params=params)

    def create(self, *args, **data):
        return self.post(*args, **data)

    def set(self, *args, **data):
        return self.put(*args, **data)


class ProxmoxAPI(ProxmoxResource):
    def __init__(self, host=None, backend="https", service="PVE", **kwargs):
        super().__init__(**kwargs)
        service = service.upper()
        backend = backend.lower()

        # throw error for unsupported services
        if service not in SERVICES.keys():
            config_failure("{} service is not supported", service)

        # throw error for unsupported backend for service
        if backend not in SERVICES[service]["supported_backends"]:
            config_failure("{} service does not support {} backend", service, backend)

        if host is not None:
            if backend == "local":
                config_failure("{} backend does not support host keyword", backend)
            else:
                kwargs["host"] = host

        kwargs["service"] = service

        # load backend module
        self._backend = importlib.import_module(f".backends.{backend}", "proxmoxer").Backend(
            **kwargs
        )
        self._backend_name = backend

        self._store = {
            "base_url": self._backend.get_base_url(),
            "session": self._backend.get_session(),
            "serializer": self._backend.get_serializer(),
        }

    def __repr__(self):
        dest = getattr(self._backend, "target", self._store.get("base_url"))
        return f"ProxmoxAPI ({self._backend_name} backend for {dest})"

    def get_tokens(self):
        """Return the auth and csrf tokens.

        Returns (None, None) if the backend is not https using password authentication.
        """
        if self._backend_name != "https":
            return None, None

        return self._backend.get_tokens()
0707010000001D000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000002000000000proxmoxer-2.2.0/proxmoxer/tools0707010000001E000081A4000000000000000000000001675E3B1A000000D0000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/proxmoxer/tools/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

from . import *  # noqa: F401 F403
from .files import *  # noqa: F401 F403
from .tasks import *  # noqa: F401 F403
0707010000001F000081A4000000000000000000000001675E3B1A000028DA000000000000000000000000000000000000002900000000proxmoxer-2.2.0/proxmoxer/tools/files.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"

import hashlib
import logging
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin, urlparse

from proxmoxer import ProxmoxResource, ResourceException
from proxmoxer.tools.tasks import Tasks

CHECKSUM_CHUNK_SIZE = 16384  # read 16k at a time while calculating the checksum for upload

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)

try:
    import requests
except ImportError:
    logger.error("Files tools requires 'requests' module\n")
    sys.exit(1)


class ChecksumInfo:
    def __init__(self, name: str, hex_size: int):
        self.name = name
        self.hex_size = hex_size

    def __str__(self):
        return self.name

    def __repr__(self):
        return f"{self.name} ({self.hex_size} digits)"


class SupportedChecksums(Enum):
    """
    An Enum of the checksum types supported by Proxmox
    """

    # ordered by preference for longer/stronger checksums first
    SHA512 = ChecksumInfo("sha512", 128)
    SHA256 = ChecksumInfo("sha256", 64)
    SHA224 = ChecksumInfo("sha224", 56)
    SHA384 = ChecksumInfo("sha384", 96)
    MD5 = ChecksumInfo("md5", 32)
    SHA1 = ChecksumInfo("sha1", 40)


class Files:
    """
    Ease-of-use tools for interacting with the uploading/downloading files
    in Proxmox VE
    """

    def __init__(self, prox: ProxmoxResource, node: str, storage: str):
        self._prox = prox
        self._node = node
        self._storage = storage

    def __repr__(self):
        return f"Files ({self._node}/{self._storage} at {self._prox})"

    def upload_local_file_to_storage(
        self,
        filename: str,
        do_checksum_check: bool = True,
        blocking_status: bool = True,
    ):
        file_path = Path(filename)

        if not file_path.is_file():
            logger.error(f'"{file_path.absolute()}" does not exist or is not a file')
            return None

        # init to None in case errors cause no values to be set
        upid: str = ""
        checksum: str = None
        checksum_type: str = None

        try:
            with open(file_path.absolute(), "rb") as f_obj:
                if do_checksum_check:
                    # iterate through SupportedChecksums and find the first one in hashlib.algorithms_available
                    for checksum_info in (v.value for v in SupportedChecksums):
                        if checksum_info.name in hashlib.algorithms_available:
                            checksum_type = checksum_info.name
                            break

                    if checksum_type is None:
                        logger.warning(
                            "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation"
                        )
                    else:
                        h = hashlib.new(checksum_type)

                        # Iterate through the file in CHECKSUM_CHUNK_SIZE size
                        for byte_block in iter(lambda: f_obj.read(CHECKSUM_CHUNK_SIZE), b""):
                            h.update(byte_block)
                        checksum = h.hexdigest()
                        logger.debug(
                            f"The {checksum_type} checksum of {file_path.absolute()} is {checksum}"
                        )

                        # reset to the start of the file so the upload can use the same file handle
                        f_obj.seek(0)

                params = {
                    "content": "iso" if file_path.absolute().name.endswith("iso") else "vztmpl",
                    "checksum-algorithm": checksum_type,
                    "checksum": checksum,
                    "filename": f_obj,
                }
                upid = self._prox.nodes(self._node).storage(self._storage).upload.post(**params)
        except OSError as e:
            logger.error(e)
            return None

        if blocking_status:
            return Tasks.blocking_status(self._prox, upid)
        else:
            return self._prox.nodes(self._node).tasks(upid).status.get()

    def download_file_to_storage(
        self,
        url: str,
        checksum: Optional[str] = None,
        checksum_type: Optional[str] = None,
        blocking_status: bool = True,
    ):
        file_info = self.get_file_info(url)
        filename = None

        if file_info is not None:
            filename = file_info.get("filename")

        if checksum is None and checksum_type is None:
            checksum, checksum_info = self.get_checksums_from_file_url(url, filename)
            checksum_type = checksum_info.name if checksum_info else None
        elif checksum is None or checksum_type is None:
            logger.error(
                "Must pass both checksum and checksum_type or leave both None for auto-discovery"
            )
            return None

        if checksum is None or checksum_type is None:
            logger.warning("Unable to discover checksum. Will not do checksum validation")

        params = {
            "checksum-algorithm": checksum_type,
            "url": url,
            "checksum": checksum,
            "content": "iso" if url.endswith("iso") else "vztmpl",
            "filename": filename,
        }
        upid = self._prox.nodes(self._node).storage(self._storage)("download-url").post(**params)

        if blocking_status:
            return Tasks.blocking_status(self._prox, upid)
        else:
            return self._prox.nodes(self._node).tasks(upid).status.get()

    def get_file_info(self, url: str):
        try:
            return self._prox.nodes(self._node)("query-url-metadata").get(url=url)

        except ResourceException as e:
            logger.warning(f"Unable to get information for {url}: {e}")
            return None

    @staticmethod
    def get_checksums_from_file_url(
        url: str, filename: str = None, preferred_type=SupportedChecksums.SHA512.value
    ):
        getters_by_quality = [
            Files._get_checksum_from_sibling_file,
            Files._get_checksum_from_extension,
            Files._get_checksum_from_extension_upper,
        ]

        # hacky way to try the preferred_type first while still trying all types with no duplicates
        all_types_with_priority = list(
            dict.fromkeys([preferred_type, *(map(lambda t: t.value, SupportedChecksums))])
        )
        for c_info in all_types_with_priority:
            for getter in getters_by_quality:
                checksum: str = getter(url, c_info, filename)
                if checksum is not None:
                    logger.info(f"{getter} found {str(c_info)} checksum {checksum}")
                    return (checksum, c_info)
                else:
                    logger.debug(f"{getter} found no {str(c_info)} checksum")

        return (None, None)

    @staticmethod
    def _get_checksum_from_sibling_file(
        url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
    ) -> Optional[str]:
        """
        Uses a checksum file in the same path as the target file to discover the checksum

        :param url: the URL string of the target file
        :type url: str
        :param checksum_info: the type of checksum to search for
        :type checksum_info: ChecksumInfo
        :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
        :type filename: str | None
        :return: a string of the checksum if found, else None
        :rtype: str | None
        """
        sumfile_url = urljoin(url, (checksum_info.name + "SUMS").upper())
        filename = filename or os.path.basename(urlparse(url).path)

        return Files._get_checksum_helper(sumfile_url, filename, checksum_info)

    @staticmethod
    def _get_checksum_from_extension(
        url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
    ) -> Optional[str]:
        """
        Uses a checksum file with a checksum extension added to the target file to discover the checksum

        :param url: the URL string of the target file
        :type url: str
        :param checksum_info: the type of checksum to search for
        :type checksum_info: ChecksumInfo
        :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
        :type filename: str | None
        :return: a string of the checksum if found, else None
        :rtype: str | None
        """
        sumfile_url = url + "." + checksum_info.name
        filename = filename or os.path.basename(urlparse(url).path)

        return Files._get_checksum_helper(sumfile_url, filename, checksum_info)

    @staticmethod
    def _get_checksum_from_extension_upper(
        url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
    ) -> Optional[str]:
        """
        Uses a checksum file with a checksum extension added to the target file to discover the checksum

        :param url: the URL string of the target file
        :type url: str
        :param checksum_info: the type of checksum to search for
        :type checksum_info: ChecksumInfo
        :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
        :type filename: str | None
        :return: a string of the checksum if found, else None
        :rtype: str | None
        """
        sumfile_url = url + "." + checksum_info.name.upper()
        filename = filename or os.path.basename(urlparse(url).path)

        return Files._get_checksum_helper(sumfile_url, filename, checksum_info)

    @staticmethod
    def _get_checksum_helper(sumfile_url: str, filename: str, checksum_info: ChecksumInfo):
        logger.debug(f"getting {sumfile_url}")
        try:
            resp = requests.get(sumfile_url, timeout=10)
        except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
            logger.info(f"Failed when trying to get {sumfile_url}")
            return None

        if resp.status_code == 200:
            for line in resp.iter_lines():
                line_str = line.decode("utf-8")
                logger.debug(f"checking for '{filename}' in '{line_str}'")
                if filename in str(line_str):
                    return line_str[0 : checksum_info.hex_size]
        return None
07070100000020000081A4000000000000000000000001675E3B1A00000A9D000000000000000000000000000000000000002900000000proxmoxer-2.2.0/proxmoxer/tools/tasks.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import time


class Tasks:
    """
    Ease-of-use tools for interacting with the tasks endpoints
    in the Proxmox API.
    """

    @staticmethod
    def blocking_status(prox, task_id, timeout=300, polling_interval=1):
        """
        Turns getting the status of a Proxmox task into a blocking call
        by polling the API until the task completes

        :param prox: The Proxmox object used to query for status
        :type prox: ProxmoxAPI
        :param task_id: the UPID of the task
        :type task_id: str
        :param timeout: If the task does not complete in this time (in seconds) return None, defaults to 300
        :type timeout: int, optional
        :param polling_interval: the time to wait between checking for status updates, defaults to 1
        :type polling_interval: float, optional
        :return: the status of the task
        :rtype: dict
        """
        node: str = Tasks.decode_upid(task_id)["node"]
        start_time: float = time.monotonic()
        data = {"status": ""}
        while data["status"] != "stopped":
            data = prox.nodes(node).tasks(task_id).status.get()
            if start_time + timeout <= time.monotonic():
                data = None  # type: ignore
                break

            time.sleep(polling_interval)
        return data

    @staticmethod
    def decode_upid(upid):
        """
        Decodes the sections of a UPID into separate fields

        :param upid: a UPID string
        :type upid: str
        :return: The decoded information from the UPID
        :rtype: dict
        """
        segments = upid.split(":")
        if segments[0] != "UPID" or len(segments) != 9:
            raise AssertionError("UPID is not in the correct format")

        data = {
            "upid": upid,
            "node": segments[1],
            "pid": int(segments[2], 16),
            "pstart": int(segments[3], 16),
            "starttime": int(segments[4], 16),
            "type": segments[5],
            "id": segments[6],
            "user": segments[7].split("!")[0],
            "comment": segments[8],
        }
        return data

    @staticmethod
    def decode_log(log_list):
        """
        Takes in a task's log data and returns a multiline string representation

        :param log_list: The log formatting returned by the Proxmox API
        :type log_list: list of dicts
        :return: a multiline string of the log
        :rtype: str
        """
        str_list = [""] * len(log_list)
        for line in log_list:
            str_list[line["n"] - 1] = line.get("t", "")

        return "\n".join(str_list)
07070100000021000081A4000000000000000000000001675E3B1A00000039000000000000000000000000000000000000001F00000000proxmoxer-2.2.0/pyproject.toml[tool.black]
line-length = 100
target-version = ['py37']
07070100000022000081A4000000000000000000000001675E3B1A000001D2000000000000000000000000000000000000001A00000000proxmoxer-2.2.0/setup.cfg[pylint]
max-line-length = 100
# allow single letter variables
variable-rgx = [a-z0-9_]{1,30}$

[pylint.messages_control]
# let black handle line length
# ignore some pytohn3 only features (f-strings)
disable = C0330, C0326, C0114, line-too-long, missing-function-docstring, consider-using-f-string,missing-class-docstring

[flake8]
max-line-length = 100
extend-ignore = E203, E501, F811
exclude = .git,__pycache__,old,build,dist,*.egg-info

[isort]
profile = black
07070100000023000081A4000000000000000000000001675E3B1A00000823000000000000000000000000000000000000001900000000proxmoxer-2.2.0/setup.py#!/usr/bin/env python
import codecs
import os
import re
import sys

from setuptools import setup

from proxmoxer import __version__ as proxmoxer_version

if not os.path.exists("README.txt") and "sdist" in sys.argv:
    with codecs.open("README.rst", encoding="utf8") as f:
        rst = f.read()
    code_block = r"(:\n\n)?\.\. code-block::.*"
    rst = re.sub(code_block, "::", rst)
    with codecs.open("README.txt", encoding="utf8", mode="wb") as f:
        f.write(rst)


try:
    readme = "README.txt" if os.path.exists("README.txt") else "README.rst"
    long_description = codecs.open(readme, encoding="utf-8").read()
except IOError:
    long_description = "Could not read README.txt"


setup(
    name="proxmoxer",
    version=proxmoxer_version,
    description="Python Wrapper for the Proxmox 2.x API (HTTP and SSH)",
    author="Oleg Butovich",
    author_email="obutovich@gmail.com",
    license="MIT",
    url="https://proxmoxer.github.io/docs/",
    download_url="http://pypi.python.org/pypi/proxmoxer",
    keywords=["proxmox", "api"],
    packages=["proxmoxer", "proxmoxer.backends", "proxmoxer.tools"],
    classifiers=[  # http://pypi.python.org/pypi?%3Aaction=list_classifiers
        "Development Status :: 5 - Production/Stable",
        "Intended Audience :: Developers",
        "Intended Audience :: System Administrators",
        "License :: OSI Approved :: MIT License",
        "Operating System :: OS Independent",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
        "Programming Language :: Python :: 3.11",
        "Programming Language :: Python :: 3.12",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python",
        "Topic :: Software Development :: Libraries :: Python Modules",
        "Topic :: System :: Clustering",
        "Topic :: System :: Monitoring",
        "Topic :: System :: Systems Administration",
    ],
    long_description=long_description,
    long_description_content_type="text/x-rst",
)
07070100000024000081A4000000000000000000000001675E3B1A000000A0000000000000000000000000000000000000002600000000proxmoxer-2.2.0/test_requirements.txt# required libraries for full functionality
openssh_wrapper
paramiko
requests
requests_toolbelt

# used by test framework
coveralls
pytest
pytest-cov
responses
07070100000025000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001600000000proxmoxer-2.2.0/tests07070100000026000081A4000000000000000000000001675E3B1A0000005C000000000000000000000000000000000000002200000000proxmoxer-2.2.0/tests/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
07070100000027000081A4000000000000000000000001675E3B1A00002E22000000000000000000000000000000000000002200000000proxmoxer-2.2.0/tests/api_mock.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import json
import re
from urllib.parse import parse_qsl, urlparse

import pytest
import responses
from requests_toolbelt import MultipartEncoder


@pytest.fixture()
def mock_pve():
    with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps:
        yield rsps


class PVERegistry(responses.registries.FirstMatchRegistry):
    base_url = "https://1.2.3.4:1234/api2/json"

    common_headers = {
        "Cache-Control": "max-age=0",
        "Connection": "close, Keep-Alive",
        "Pragma": "no-cache",
        "Server": "pve-api-daemon/3.0",
        "Content-Type": "application/json;charset=UTF-8",
    }

    def __init__(self):
        super().__init__()
        for resp in self._generate_static_responses():
            self.add(resp)

        for resp in self._generate_dynamic_responses():
            self.add(resp)

    def _generate_static_responses(self):
        resps = []

        # Basic GET requests
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/version",
                json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}},
            )
        )

        resps.append(
            responses.Response(
                method="POST",
                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"),
                # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
                json={
                    "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done",
                    "success": 1,
                },
            )
        )

        resps.append(
            responses.Response(
                method="POST",
                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"),
                # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
                json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"},
            )
        )
        resps.append(
            responses.Response(
                method="POST",
                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"),
                status=500,
                body="storage 'missing' does not exist",
            )
        )

        return resps

    def _generate_dynamic_responses(self):
        resps = []

        # Authentication
        resps.append(
            responses.CallbackResponse(
                method="POST",
                url=self.base_url + "/access/ticket",
                callback=self._cb_password_auth,
            )
        )

        # Session testing
        resps.append(
            responses.CallbackResponse(
                method="GET",
                url=self.base_url + "/fake/echo",
                callback=self._cb_echo,
            )
        )

        resps.append(
            responses.CallbackResponse(
                method="GET",
                url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"),
                callback=self._cb_echo,
            )
        )

        resps.append(
            responses.CallbackResponse(
                method="GET",
                url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"),
                callback=self._cb_qemu_monitor,
            )
        )

        resps.append(
            responses.CallbackResponse(
                method="GET",
                url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"),
                callback=self._cb_task_status,
            )
        )

        resps.append(
            responses.CallbackResponse(
                method="GET",
                url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"),
                callback=self._cb_url_metadata,
            )
        )

        return resps

    ###################################
    # Callbacks for Dynamic Responses #
    ###################################

    def _cb_echo(self, request):
        body = request.body
        if body is not None:
            if isinstance(body, MultipartEncoder):
                body = body.to_string()  # really, to byte string
            body = body if isinstance(body, str) else str(body, "utf-8")

        resp = {
            "method": request.method,
            "url": request.url,
            "headers": dict(request.headers),
            "cookies": request._cookies.get_dict(),
            "body": body,
            # "body_json": dict(parse_qsl(request.body)),
        }
        return (200, self.common_headers, json.dumps(resp))

    def _cb_password_auth(self, request):
        form_data_dict = dict(parse_qsl(request.body))

        # if this user should not be authenticated
        if form_data_dict.get("username") == "bad_auth":
            return (
                401,
                self.common_headers,
                json.dumps({"data": None}),
            )
        # if this user requires OTP and it is not included
        if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None:
            return (
                200,
                self.common_headers,
                json.dumps(
                    {
                        "data": {
                            "ticket": "otp_ticket",
                            "CSRFPreventionToken": "CSRFPreventionToken",
                            "NeedTFA": 1,
                        }
                    }
                ),
            )

        # if this is the first ticket
        if form_data_dict.get("password") != "ticket":
            return (
                200,
                self.common_headers,
                json.dumps(
                    {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}}
                ),
            )
        # if this is refreshing the ticket, return new ticket
        else:
            return (
                200,
                self.common_headers,
                json.dumps(
                    {
                        "data": {
                            "ticket": "new_ticket",
                            "CSRFPreventionToken": "CSRFPreventionToken_2",
                        }
                    }
                ),
            )

    def _cb_task_status(self, request):
        resp = {}
        if "keep-running" in request.url:
            resp = {
                "data": {
                    "id": "110",
                    "pid": 1044989,
                    "node": "node1",
                    "pstart": 284768076,
                    "status": "running",
                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
                    "starttime": 1661825068,
                    "user": "root@pam",
                    "type": "vzdump",
                }
            }

        elif "stopped" in request.url:
            resp = {
                "data": {
                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
                    "starttime": 1661825068,
                    "user": "root@pam",
                    "type": "vzdump",
                    "pstart": 284768076,
                    "status": "stopped",
                    "exitstatus": "interrupted by signal",
                    "pid": 1044989,
                    "id": "110",
                    "node": "node1",
                }
            }

        elif "done" in request.url:
            resp = {
                "data": {
                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
                    "starttime": 1661825068,
                    "user": "root@pam",
                    "type": "vzdump",
                    "pstart": 284768076,
                    "status": "stopped",
                    "exitstatus": "OK",
                    "pid": 1044989,
                    "id": "110",
                    "node": "node1",
                }
            }

        elif "comment" in request.url:
            resp = {
                "data": {
                    "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
                    "node": "node",
                    "pid": 0,
                    "pstart": 0,
                    "starttime": 0,
                    "type": "task",
                    "id": "id",
                    "user": "root@pam",
                    "status": "stopped",
                    "exitstatus": "OK",
                }
            }

        return (200, self.common_headers, json.dumps(resp))

    def _cb_url_metadata(self, request):
        form_data_dict = dict(parse_qsl((urlparse(request.url)).query))

        if "file.iso" in form_data_dict.get("url", ""):
            return (
                200,
                self.common_headers,
                json.dumps(
                    {
                        "data": {
                            "size": 123456,
                            "filename": "file.iso",
                            "mimetype": "application/x-iso9660-image",
                            # "mimetype": "application/octet-stream",
                        },
                        "success": 1,
                    }
                ),
            )
        elif "invalid.iso" in form_data_dict.get("url", ""):
            return (
                500,
                self.common_headers,
                json.dumps(
                    {
                        "status": 500,
                        "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n",
                        "success": 0,
                        "data": None,
                    }
                ),
            )
        elif "missing.iso" in form_data_dict.get("url", ""):
            return (
                500,
                self.common_headers,
                json.dumps(
                    {
                        "status": 500,
                        "success": 0,
                        "message": "invalid server response: '404 Not Found'\n",
                        "data": None,
                    }
                ),
            )

        elif "index.html" in form_data_dict.get("url", ""):
            return (
                200,
                self.common_headers,
                json.dumps(
                    {
                        "success": 1,
                        "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664},
                    }
                ),
            )

    def _cb_qemu_monitor(self, request):
        body = request.body
        if body is not None:
            body = body if isinstance(body, str) else str(body, "utf-8")

        # if the command is an array, throw the type error PVE would throw
        if "&" in body:
            return (
                400,
                self.common_headers,
                json.dumps(
                    {
                        "data": None,
                        "errors": {"command": "type check ('string') failed - got ARRAY"},
                    }
                ),
            )
        else:
            resp = {
                "method": request.method,
                "url": request.url,
                "headers": dict(request.headers),
                "cookies": request._cookies.get_dict(),
                "body": body,
                # "body_json": dict(parse_qsl(request.body)),
            }
            print(resp)
            return (200, self.common_headers, json.dumps(resp))
07070100000028000081A4000000000000000000000001675E3B1A00000E5A000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/files_mock.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import re

import pytest
import responses
from requests import exceptions

from .api_mock import PVERegistry


@pytest.fixture()
def mock_files():
    with responses.RequestsMock(
        registry=FilesRegistry, assert_all_requests_are_fired=False
    ) as rsps:
        yield rsps


class FilesRegistry(responses.registries.FirstMatchRegistry):
    base_url = "https://sub.domain.tld"

    common_headers = {
        "Cache-Control": "max-age=0",
        "Connection": "close, Keep-Alive",
        "Pragma": "no-cache",
        "Server": "pve-api-daemon/3.0",
        "Content-Type": "application/json;charset=UTF-8",
    }

    def __init__(self):
        super().__init__()
        for resp in self._generate_static_responses():
            self.add(resp)

    def _generate_static_responses(self):
        resps = []

        # Basic GET requests
        resps.append(responses.Response(method="GET", url=self.base_url, body="hello world"))
        resps.append(
            responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS")
        )

        # sibling
        resps.append(
            responses.Response(
                method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n"
            )
        )
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/sibling/TESTINGSUMS",
                body="this_is_the_hash  file.iso",
            )
        )

        # extension
        resps.append(
            responses.Response(
                method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n"
            )
        )
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/extension/file.iso.testing",
                body="this_is_the_hash  file.iso",
            )
        )
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/extension/connectionerror.iso.testing",
                body=exceptions.ConnectionError(),
            )
        )
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/extension/readtimeout.iso.testing",
                body=exceptions.ReadTimeout(),
            )
        )

        # extension upper
        resps.append(
            responses.Response(
                method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n"
            )
        )
        resps.append(
            responses.Response(
                method="GET",
                url=self.base_url + "/upper/file.iso.TESTING",
                body="this_is_the_hash  file.iso",
            )
        )

        resps.append(
            responses.Response(
                method="GET",
                url=re.compile(self.base_url + r"/checksums/file.iso.\w+"),
                body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso",
            )
        )

        return resps


@pytest.fixture()
def mock_files_and_pve():
    with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps:
        yield rsps


class BothRegistry(responses.registries.FirstMatchRegistry):
    def __init__(self):
        super().__init__()
        registries = [FilesRegistry(), PVERegistry()]

        for reg in registries:
            for resp in reg.registered:
                self.add(resp)
07070100000029000081A4000000000000000000000001675E3B1A00004215000000000000000000000000000000000000002800000000proxmoxer-2.2.0/tests/known_issues.json{
  "errors": [],
  "generated_at": "2022-08-25T03:08:48Z",
  "metrics": {
    "_totals": {
      "CONFIDENCE.HIGH": 3,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 11,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 3,
      "SEVERITY.MEDIUM": 11,
      "SEVERITY.UNDEFINED": 0,
      "loc": 1947,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/__init__.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 5,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/__init__.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 3,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/command_base.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 115,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/https.py": {
      "CONFIDENCE.HIGH": 1,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 1,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 286,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/local.py": {
      "CONFIDENCE.HIGH": 2,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 2,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 14,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/openssh.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 53,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/backends/ssh_paramiko.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 1,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 1,
      "SEVERITY.UNDEFINED": 0,
      "loc": 58,
      "nosec": 0,
      "skipped_tests": 0
    },
    "proxmoxer/core.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 155,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/api_mock.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 108,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_command_base.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 2,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 2,
      "SEVERITY.UNDEFINED": 0,
      "loc": 195,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_core.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 241,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_https.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 362,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_https_helpers.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 62,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_imports.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 80,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_local.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 0,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 0,
      "SEVERITY.UNDEFINED": 0,
      "loc": 35,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_openssh.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 2,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 2,
      "SEVERITY.UNDEFINED": 0,
      "loc": 62,
      "nosec": 0,
      "skipped_tests": 0
    },
    "tests/test_paramiko.py": {
      "CONFIDENCE.HIGH": 0,
      "CONFIDENCE.LOW": 0,
      "CONFIDENCE.MEDIUM": 6,
      "CONFIDENCE.UNDEFINED": 0,
      "SEVERITY.HIGH": 0,
      "SEVERITY.LOW": 0,
      "SEVERITY.MEDIUM": 6,
      "SEVERITY.UNDEFINED": 0,
      "loc": 113,
      "nosec": 0,
      "skipped_tests": 0
    }
  },
  "results": [
    {
      "code": "332     def get_serializer(self):\n333         assert self.mode == \"json\"\n334         return JsonSerializer()\n",
      "col_offset": 8,
      "filename": "proxmoxer/backends/https.py",
      "issue_confidence": "HIGH",
      "issue_cwe": {
        "id": 703,
        "link": "https://cwe.mitre.org/data/definitions/703.html"
      },
      "issue_severity": "LOW",
      "issue_text": "Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.",
      "line_number": 333,
      "line_range": [
        333
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b101_assert_used.html",
      "test_id": "B101",
      "test_name": "assert_used"
    },
    {
      "code": "1 import shutil\n2 from subprocess import PIPE, Popen\n3 \n4 from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession\n",
      "col_offset": 0,
      "filename": "proxmoxer/backends/local.py",
      "issue_confidence": "HIGH",
      "issue_cwe": {
        "id": 78,
        "link": "https://cwe.mitre.org/data/definitions/78.html"
      },
      "issue_severity": "LOW",
      "issue_text": "Consider possible security implications associated with the subprocess module.",
      "line_number": 2,
      "line_range": [
        2,
        3
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/blacklists/blacklist_imports.html#b404-import-subprocess",
      "test_id": "B404",
      "test_name": "blacklist"
    },
    {
      "code": "8     def _exec(self, cmd):\n9         proc = Popen(cmd, stdout=PIPE, stderr=PIPE)\n10         stdout, stderr = proc.communicate(timeout=self.timeout)\n",
      "col_offset": 15,
      "filename": "proxmoxer/backends/local.py",
      "issue_confidence": "HIGH",
      "issue_cwe": {
        "id": 78,
        "link": "https://cwe.mitre.org/data/definitions/78.html"
      },
      "issue_severity": "LOW",
      "issue_text": "subprocess call - check for execution of untrusted input.",
      "line_number": 9,
      "line_range": [
        9
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b603_subprocess_without_shell_equals_true.html",
      "test_id": "B603",
      "test_name": "subprocess_without_shell_equals_true"
    },
    {
      "code": "62         session = self.ssh_client.get_transport().open_session()\n63         session.exec_command(shell_join(cmd))\n64         stdout = session.makefile(\"rb\", -1).read().decode()\n",
      "col_offset": 8,
      "filename": "proxmoxer/backends/ssh_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 78,
        "link": "https://cwe.mitre.org/data/definitions/78.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Possible shell injection via Paramiko call, check inputs are properly sanitized.",
      "line_number": 63,
      "line_range": [
        63
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b601_paramiko_calls.html",
      "test_id": "B601",
      "test_name": "paramiko_calls"
    },
    {
      "code": "39         with pytest.raises(NotImplementedError), tempfile.TemporaryFile(\"w+b\") as f_obj:\n40             self._session.upload_file_obj(f_obj, \"/tmp/file.iso\")\n41 \n",
      "col_offset": 49,
      "filename": "tests/test_command_base.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 40,
      "line_range": [
        40
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "160                 \"-tmpfilename\",\n161                 \"/tmp/tmpasdfasdf\",\n162                 \"--output-format\",\n163                 \"json\",\n164             ]\n165 \n166 \n167 class TestJsonSimpleSerializer:\n168     _serializer = command_base.JsonSimpleSerializer()\n169 \n170     def test_loads_pass(self):\n171         input_str = '{\"key1\": \"value1\", \"key2\": \"value2\"}'\n172         exp_output = {\"key1\": \"value1\", \"key2\": \"value2\"}\n173 \n",
      "col_offset": 16,
      "filename": "tests/test_command_base.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 161,
      "line_range": [
        152,
        153,
        154,
        155,
        156,
        157,
        158,
        159,
        160,
        161,
        162,
        163
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "61         with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n62             mock_session.upload_file_obj(f_obj, \"/tmp/file\")\n63 \n",
      "col_offset": 48,
      "filename": "tests/test_openssh.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 62,
      "line_range": [
        62
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "65                 (f_obj,),\n66                 target=\"/tmp/file\",\n67             )\n",
      "col_offset": 23,
      "filename": "tests/test_openssh.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 66,
      "line_range": [
        66
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "23         sess = ssh_paramiko.SshParamikoSession(\n24             \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n25         )\n",
      "col_offset": 66,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 24,
      "line_range": [
        24
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "29         assert sess.password == \"password\"\n30         assert sess.private_key_file == \"/tmp/key_file\"\n31         assert sess.port == 1234\n",
      "col_offset": 40,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 30,
      "line_range": [
        30
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "55         sess = ssh_paramiko.SshParamikoSession(\n56             \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n57         )\n",
      "col_offset": 66,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 56,
      "line_range": [
        56
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "63             look_for_keys=True,\n64             key_filename=\"/tmp/key_file\",\n65             password=\"password\",\n",
      "col_offset": 25,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 64,
      "line_range": [
        64
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "110         with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n111             sess.upload_file_obj(f_obj, \"/tmp/file\")\n112 \n",
      "col_offset": 40,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 111,
      "line_range": [
        111
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "112 \n113             mock_sftp.putfo.assert_called_once_with(f_obj, \"/tmp/file\")\n114 \n",
      "col_offset": 59,
      "filename": "tests/test_paramiko.py",
      "issue_confidence": "MEDIUM",
      "issue_cwe": {
        "id": 377,
        "link": "https://cwe.mitre.org/data/definitions/377.html"
      },
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 113,
      "line_range": [
        113
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    }
  ]
}0707010000002A000081A4000000000000000000000001675E3B1A0000238E000000000000000000000000000000000000002B00000000proxmoxer-2.2.0/tests/test_command_base.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import tempfile
from unittest import mock

import pytest

from proxmoxer.backends import command_base

from .api_mock import PVERegistry

# pylint: disable=no-self-use


class TestResponse:
    def test_init_all_args(self):
        resp = command_base.Response(b"content", 200)

        assert resp.content == b"content"
        assert resp.text == "b'content'"
        assert resp.status_code == 200
        assert resp.headers == {"content-type": "application/json"}
        assert str(resp) == "Response (200) b'content'"


class TestCommandBaseSession:
    base_url = PVERegistry.base_url
    _session = command_base.CommandBaseSession()

    def test_init_all_args(self):
        sess = command_base.CommandBaseSession(service="SERVICE", timeout=10, sudo=True)

        assert sess.service == "service"
        assert sess.timeout == 10
        assert sess.sudo is True

    def test_exec(self):
        with pytest.raises(NotImplementedError):
            self._session._exec("command")

    def test_upload_file_obj(self):
        with pytest.raises(NotImplementedError), tempfile.TemporaryFile("w+b") as f_obj:
            self._session.upload_file_obj(f_obj, "/tmp/file.iso")

    def test_request_basic(self, mock_exec):
        resp = self._session.request("GET", self.base_url + "/fake/echo")

        assert resp.status_code == 200
        assert resp.content == [
            "pvesh",
            "get",
            self.base_url + "/fake/echo",
            "--output-format",
            "json",
        ]

    def test_request_task(self, mock_exec_task):
        resp = self._session.request("GET", self.base_url + "/stdout")

        assert resp.status_code == 200
        assert (
            resp.content == "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
        )

        resp_stderr = self._session.request("GET", self.base_url + "/stderr")

        assert resp_stderr.status_code == 200
        assert (
            resp_stderr.content
            == "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
        )
        # assert False  # DEBUG

    def test_request_error(self, mock_exec_err):
        resp = self._session.request(
            "GET", self.base_url + "/fake/echo", data={"thing": "403 Unauthorized"}
        )

        assert resp.status_code == 403
        assert (
            resp.content
            == "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\n403 Unauthorized\n--output-format\njson"
        )

    def test_request_error_generic(self, mock_exec_err):
        resp = self._session.request("GET", self.base_url + "/fake/echo", data={"thing": "failure"})

        assert resp.status_code == 500
        assert (
            resp.content
            == "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\nfailure\n--output-format\njson"
        )

    def test_request_sudo(self, mock_exec):
        resp = command_base.CommandBaseSession(sudo=True).request(
            "GET", self.base_url + "/fake/echo"
        )

        assert resp.status_code == 200
        assert resp.content == [
            "sudo",
            "pvesh",
            "get",
            self.base_url + "/fake/echo",
            "--output-format",
            "json",
        ]

    def test_request_data(self, mock_exec):
        resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"})

        assert resp.status_code == 200
        assert resp.content == [
            "pvesh",
            "get",
            self.base_url + "/fake/echo",
            "-key",
            "value",
            "--output-format",
            "json",
        ]

    def test_request_bytes_data(self, mock_exec):
        resp = self._session.request(
            "GET", self.base_url + "/fake/echo", data={"key": b"bytes-value"}
        )

        assert resp.status_code == 200
        assert resp.content == [
            "pvesh",
            "get",
            self.base_url + "/fake/echo",
            "-key",
            "bytes-value",
            "--output-format",
            "json",
        ]

    def test_request_qemu_exec(self, mock_exec):
        resp = self._session.request(
            "POST",
            self.base_url + "/node/node1/qemu/100/agent/exec",
            data={"command": "echo 'hello world'"},
        )

        assert resp.status_code == 200
        assert resp.content == [
            "pvesh",
            "create",
            self.base_url + "/node/node1/qemu/100/agent/exec",
            "-command",
            "echo",
            "-command",
            "hello world",
            "--output-format",
            "json",
        ]

    def test_request_qemu_exec_list(self, mock_exec):
        resp = self._session.request(
            "POST",
            self.base_url + "/node/node1/qemu/100/agent/exec",
            data={"command": ["echo", "hello world"]},
        )

        assert resp.status_code == 200
        assert resp.content == [
            "pvesh",
            "create",
            self.base_url + "/node/node1/qemu/100/agent/exec",
            "-command",
            "echo",
            "-command",
            "hello world",
            "--output-format",
            "json",
        ]

    def test_request_upload(self, mock_exec, mock_upload_file_obj):
        with tempfile.NamedTemporaryFile("w+b") as f_obj:
            resp = self._session.request(
                "POST",
                self.base_url + "/node/node1/storage/local/upload",
                data={"content": "iso", "filename": f_obj},
            )

            assert resp.status_code == 200
            assert resp.content == [
                "pvesh",
                "create",
                self.base_url + "/node/node1/storage/local/upload",
                "-content",
                "iso",
                "-filename",
                str(f_obj.name),
                "-tmpfilename",
                "/tmp/tmpasdfasdf",
                "--output-format",
                "json",
            ]


class TestJsonSimpleSerializer:
    _serializer = command_base.JsonSimpleSerializer()

    def test_loads_pass(self):
        input_str = '{"key1": "value1", "key2": "value2"}'
        exp_output = {"key1": "value1", "key2": "value2"}

        response = command_base.Response(input_str.encode("utf-8"), 200)

        act_output = self._serializer.loads(response)

        assert act_output == exp_output

    def test_loads_not_json(self):
        input_str = "There was an error with the request"
        exp_output = {"errors": b"There was an error with the request"}

        response = command_base.Response(input_str.encode("utf-8"), 200)

        act_output = self._serializer.loads(response)

        assert act_output == exp_output

    def test_loads_not_unicode(self):
        input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80'
        exp_output = {"errors": input_str.encode("utf-8")}

        response = command_base.Response(input_str.encode("utf-8"), 200)

        act_output = self._serializer.loads(response)

        assert act_output == exp_output


class TestCommandBaseBackend:
    backend = command_base.CommandBaseBackend()
    sess = command_base.CommandBaseSession()

    backend.session = sess

    def test_init(self):
        b = command_base.CommandBaseBackend()

        assert b.session is None
        assert b.target is None

    def test_get_session(self):
        assert self.backend.get_session() == self.sess

    def test_get_base_url(self):
        assert self.backend.get_base_url() == ""

    def test_get_serializer(self):
        assert isinstance(self.backend.get_serializer(), command_base.JsonSimpleSerializer)


@classmethod
def _exec_echo(_, cmd):
    # if getting a tmpfile on the remote, return fake tmpfile
    if cmd == [
        "python3",
        "-c",
        "import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
    ]:
        return b"/tmp/tmpasdfasdf", None
    return cmd, None


@classmethod
def _exec_err(_, cmd):
    return None, "\n".join(cmd)


@classmethod
def _exec_task(_, cmd):
    upid = "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done"
    if "stderr" in cmd[2]:
        return None, upid
    else:
        return upid, None


@classmethod
def upload_file_obj_echo(_, file_obj, remote_path):
    return file_obj, remote_path


@pytest.fixture
def mock_upload_file_obj():
    with mock.patch.object(
        command_base.CommandBaseSession, "upload_file_obj", upload_file_obj_echo
    ):
        yield


@pytest.fixture
def mock_exec():
    with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_echo):
        yield


@pytest.fixture
def mock_exec_task():
    with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_task):
        yield


@pytest.fixture
def mock_exec_err():
    with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_err):
        yield
0707010000002B000081A4000000000000000000000001675E3B1A00003709000000000000000000000000000000000000002300000000proxmoxer-2.2.0/tests/test_core.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import logging
from unittest import mock

import pytest

from proxmoxer import core
from proxmoxer.backends import https
from proxmoxer.backends.command_base import JsonSimpleSerializer, Response

from .api_mock import (  # pylint: disable=unused-import # noqa: F401
    PVERegistry,
    mock_pve,
)
from .test_paramiko import mock_ssh_client  # pylint: disable=unused-import # noqa: F401

# pylint: disable=no-self-use,protected-access

MODULE_LOGGER_NAME = "proxmoxer.core"


class TestResourceException:
    def test_init_none(self):
        e = core.ResourceException(None, None, None)

        assert e.status_code is None
        assert e.status_message is None
        assert e.content is None
        assert e.errors is None
        assert str(e) == "None None: None"
        assert repr(e) == "ResourceException('None None: None')"

    def test_init_basic(self):
        e = core.ResourceException(500, "Internal Error", "Unable to do the thing")

        assert e.status_code == 500
        assert e.status_message == "Internal Error"
        assert e.content == "Unable to do the thing"
        assert e.errors is None
        assert str(e) == "500 Internal Error: Unable to do the thing"
        assert repr(e) == "ResourceException('500 Internal Error: Unable to do the thing')"

    def test_init_error(self):
        e = core.ResourceException(
            500, "Internal Error", "Unable to do the thing", "functionality not found"
        )

        assert e.status_code == 500
        assert e.status_message == "Internal Error"
        assert e.content == "Unable to do the thing"
        assert e.errors == "functionality not found"
        assert str(e) == "500 Internal Error: Unable to do the thing - functionality not found"
        assert (
            repr(e)
            == "ResourceException('500 Internal Error: Unable to do the thing - functionality not found')"
        )


class TestProxmoxResource:
    obj = core.ProxmoxResource()
    base_url = "http://example.com/"

    def test_url_join_empty_base(self):
        assert "/" == self.obj.url_join("", "")

    def test_url_join_empty(self):
        assert "https://www.example.com:80/" == self.obj.url_join("https://www.example.com:80", "")

    def test_url_join_basic(self):
        assert "https://www.example.com/nodes/node1" == self.obj.url_join(
            "https://www.example.com", "nodes", "node1"
        )

    def test_url_join_all_segments(self):
        assert "https://www.example.com/base/path#div1?search=query" == self.obj.url_join(
            "https://www.example.com/base#div1?search=query", "path"
        )

    def test_repr(self):
        obj = core.ProxmoxResource(base_url="root")
        assert repr(obj.first.second("third")) == "ProxmoxResource (root/first/second/third)"

    def test_getattr_private(self):
        with pytest.raises(AttributeError) as exc_info:
            self.obj._thing

        assert str(exc_info.value) == "_thing"

    def test_getattr_single(self):
        test_obj = core.ProxmoxResource(base_url=self.base_url)
        ret = test_obj.nodes

        assert isinstance(ret, core.ProxmoxResource)
        assert ret._store["base_url"] == self.base_url + "nodes"

    def test_call_basic(self):
        test_obj = core.ProxmoxResource(base_url=self.base_url)
        ret = test_obj("nodes")

        assert isinstance(ret, core.ProxmoxResource)
        assert ret._store["base_url"] == self.base_url + "nodes"

    def test_call_emptystr(self):
        test_obj = core.ProxmoxResource(base_url=self.base_url)
        ret = test_obj("")

        assert isinstance(ret, core.ProxmoxResource)
        assert ret._store["base_url"] == self.base_url

    def test_call_list(self):
        test_obj = core.ProxmoxResource(base_url=self.base_url)
        ret = test_obj(["nodes", "node1"])

        assert isinstance(ret, core.ProxmoxResource)
        assert ret._store["base_url"] == self.base_url + "nodes/node1"

    def test_call_stringable(self):
        test_obj = core.ProxmoxResource(base_url=self.base_url)

        class Thing:
            def __str__(self):
                return "string"

        ret = test_obj(Thing())

        assert isinstance(ret, core.ProxmoxResource)
        assert ret._store["base_url"] == self.base_url + "string"

    def test_request_basic_get(self, mock_resource, caplog):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)

        ret = mock_resource._request("GET", params={"key": "value"})

        assert caplog.record_tuples == [
            (MODULE_LOGGER_NAME, logging.INFO, "GET " + self.base_url),
            (
                MODULE_LOGGER_NAME,
                logging.DEBUG,
                'Status code: 200, output: b\'{"data": {"key": "value"}}\'',
            ),
        ]

        assert ret == {"data": {"key": "value"}}

    def test_request_basic_post(self, mock_resource, caplog):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)

        ret = mock_resource._request("POST", data={"key": "value"})

        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.INFO,
                "POST " + self.base_url + " " + str({"key": "value"}),
            ),
            (
                MODULE_LOGGER_NAME,
                logging.DEBUG,
                'Status code: 200, output: b\'{"data": {"key": "value"}}\'',
            ),
        ]
        assert ret == {"data": {"key": "value"}}

    def test_request_fail(self, mock_resource, caplog):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)

        with pytest.raises(core.ResourceException) as exc_info:
            mock_resource("fail")._request("GET")

        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.INFO,
                "GET " + self.base_url + "fail",
            ),
            (
                MODULE_LOGGER_NAME,
                logging.DEBUG,
                "Status code: 500, output: b'this is the error'",
            ),
        ]
        assert exc_info.value.status_code == 500
        assert exc_info.value.status_message == "Internal Server Error"
        assert exc_info.value.content == str(b"this is the error")
        assert exc_info.value.errors is None

    def test_request_fail_with_reason(self, mock_resource, caplog):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)

        with pytest.raises(core.ResourceException) as exc_info:
            mock_resource(["fail", "reason"])._request("GET")

        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.INFO,
                "GET " + self.base_url + "fail/reason",
            ),
            (
                MODULE_LOGGER_NAME,
                logging.DEBUG,
                "Status code: 500, output: b'this is the error'",
            ),
        ]
        assert exc_info.value.status_code == 500
        assert exc_info.value.status_message == "Internal Server Error"
        assert exc_info.value.content == "this is the reason"
        assert exc_info.value.errors == {"errors": b"this is the error"}

    def test_request_params_cleanup(self, mock_resource):
        mock_resource._request("GET", params={"key": "value", "remove_me": None})

        assert mock_resource._store["session"].params == {"key": "value"}

    def test_request_data_cleanup(self, mock_resource):
        mock_resource._request("POST", data={"key": "value", "remove_me": None})

        assert mock_resource._store["session"].data == {"key": "value"}


class TestProxmoxResourceMethods:
    _resource = core.ProxmoxResource(base_url="https://example.com")

    def test_get(self, mock_private_request):
        ret = self._resource.get("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "GET"
        assert ret["params"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"

    def test_post(self, mock_private_request):
        ret = self._resource.post("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "POST"
        assert ret["data"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"

    def test_put(self, mock_private_request):
        ret = self._resource.put("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "PUT"
        assert ret["data"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"

    def test_delete(self, mock_private_request):
        ret = self._resource.delete("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "DELETE"
        assert ret["params"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"

    def test_create(self, mock_private_request):
        ret = self._resource.create("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "POST"
        assert ret["data"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"

    def test_set(self, mock_private_request):
        ret = self._resource.set("nodes", key="value")
        ret_self = ret["self"]

        assert ret["method"] == "PUT"
        assert ret["data"] == {"key": "value"}
        assert ret_self._store["base_url"] == "https://example.com/nodes"


class TestProxmoxAPI:
    def test_init_basic(self):
        prox = core.ProxmoxAPI(
            "host", token_name="name", token_value="value", service="pVe", backend="hTtPs"
        )

        assert isinstance(prox, core.ProxmoxAPI)
        assert isinstance(prox, core.ProxmoxResource)
        assert isinstance(prox._backend, https.Backend)
        assert prox._backend.auth.service == "PVE"

    def test_init_invalid_service(self):
        with pytest.raises(NotImplementedError) as exc_info:
            core.ProxmoxAPI("host", service="NA")

        assert str(exc_info.value) == "NA service is not supported"

    def test_init_invalid_backend(self):
        with pytest.raises(NotImplementedError) as exc_info:
            core.ProxmoxAPI("host", service="pbs", backend="LocaL")

        assert str(exc_info.value) == "PBS service does not support local backend"

    def test_init_local_with_host(self):
        with pytest.raises(NotImplementedError) as exc_info:
            core.ProxmoxAPI("host", service="pve", backend="LocaL")

        assert str(exc_info.value) == "local backend does not support host keyword"

    def test_repr_https(self):
        prox = core.ProxmoxAPI("host", token_name="name", token_value="value", backend="hTtPs")

        assert repr(prox) == "ProxmoxAPI (https backend for https://host:8006/api2/json)"

    def test_repr_local(self):
        prox = core.ProxmoxAPI(backend="local")

        assert repr(prox) == "ProxmoxAPI (local backend for localhost)"

    def test_repr_openssh(self):
        prox = core.ProxmoxAPI("host", user="user", backend="openssh")

        assert repr(prox) == "ProxmoxAPI (openssh backend for host)"

    def test_repr_paramiko(self, mock_ssh_client):
        prox = core.ProxmoxAPI("host", user="user", backend="ssh_paramiko")

        assert repr(prox) == "ProxmoxAPI (ssh_paramiko backend for host)"

    def test_get_tokens_https(self, mock_pve):
        prox = core.ProxmoxAPI("1.2.3.4:1234", user="user", password="password", backend="https")
        ticket, csrf = prox.get_tokens()

        assert ticket == "ticket"
        assert csrf == "CSRFPreventionToken"

    def test_get_tokens_local(self):
        prox = core.ProxmoxAPI(service="pve", backend="local")
        ticket, csrf = prox.get_tokens()

        assert ticket is None
        assert csrf is None

    def test_init_with_cert(self):
        prox = core.ProxmoxAPI(
            "host",
            token_name="name",
            token_value="value",
            service="pVe",
            backend="hTtPs",
            cert="somepem",
        )

        assert isinstance(prox, core.ProxmoxAPI)
        assert isinstance(prox, core.ProxmoxResource)
        assert isinstance(prox._backend, https.Backend)
        assert prox._backend.auth.service == "PVE"
        assert prox._backend.cert == "somepem"
        assert prox._store["session"].cert == "somepem"

    def test_init_with_cert_key(self):
        prox = core.ProxmoxAPI(
            "host",
            token_name="name",
            token_value="value",
            service="pVe",
            backend="hTtPs",
            cert=("somepem", "somekey"),
        )

        assert isinstance(prox, core.ProxmoxAPI)
        assert isinstance(prox, core.ProxmoxResource)
        assert isinstance(prox._backend, https.Backend)
        assert prox._backend.auth.service == "PVE"
        assert prox._backend.cert == ("somepem", "somekey")
        assert prox._store["session"].cert == ("somepem", "somekey")


class MockSession:
    def request(self, method, url, data=None, params=None):
        # store the arguments in the session so they can be tested after the call
        self.data = data
        self.params = params
        self.method = method
        self.url = url

        if "fail" in url:
            r = Response(b"this is the error", 500)
            if "reason" in url:
                r.reason = "this is the reason"
            return r
        else:
            return Response(b'{"data": {"key": "value"}}', 200)


@pytest.fixture
def mock_private_request():
    def mock_request(self, method, data=None, params=None):
        return {"self": self, "method": method, "data": data, "params": params}

    with mock.patch("proxmoxer.core.ProxmoxResource._request", mock_request):
        yield


@pytest.fixture
def mock_resource():
    return core.ProxmoxResource(
        session=MockSession(), base_url="http://example.com/", serializer=JsonSimpleSerializer()
    )
0707010000002C000081A4000000000000000000000001675E3B1A00004F36000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/test_https.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import logging
import re
import sys
import tempfile
from unittest import mock

import pytest
from requests import Request, Response

import proxmoxer as core
from proxmoxer.backends import https

from .api_mock import (  # pylint: disable=unused-import # noqa: F401
    PVERegistry,
    mock_pve,
)

# pylint: disable=no-self-use

MODULE_LOGGER_NAME = "proxmoxer.backends.https"


class TestHttpsBackend:
    """
    Tests for the proxmox.backends.https file.
    Only tests the Backend class for correct setting of
    variables and selection of auth class.
    Other classes are separately tested.
    """

    def test_init_no_auth(self):
        with pytest.raises(NotImplementedError) as exc_info:
            https.Backend("1.2.3.4:1234")

        assert str(exc_info.value) == "No valid authentication credentials were supplied"

    def test_init_ip4_separate_port(self):
        backend = https.Backend("1.2.3.4", port=1234, token_name="")
        exp_base_url = "https://1.2.3.4:1234/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_ip4_inline_port(self):
        backend = https.Backend("1.2.3.4:1234", token_name="")
        exp_base_url = "https://1.2.3.4:1234/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_ip6_separate_port(self):
        backend = https.Backend("2001:db8::1:2:3:4", port=1234, token_name="")
        exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_ip6_brackets_separate_port(self):
        backend = https.Backend("[2001:0db8::1:2:3:4]", port=1234, token_name="")
        exp_base_url = "https://[2001:0db8::1:2:3:4]:1234/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_ip6_inline_port(self):
        backend = https.Backend("[2001:db8::1:2:3:4]:1234", token_name="")
        exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_ip4_no_port(self):
        backend = https.Backend("1.2.3.4", token_name="")
        exp_base_url = "https://1.2.3.4:8006/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_path_prefix(self):
        backend = https.Backend("1.2.3.4:1234", path_prefix="path", token_name="")
        exp_base_url = "https://1.2.3.4:1234/path/api2/json"

        assert backend.get_base_url() == exp_base_url

    def test_init_token_pass(self):
        backend = https.Backend("1.2.3.4:1234", token_name="name")

        assert isinstance(backend.auth, https.ProxmoxHTTPApiTokenAuth)

    def test_init_token_not_supported(self, apply_none_service):
        with pytest.raises(NotImplementedError) as exc_info:
            https.Backend("1.2.3.4:1234", token_name="name", service="NONE")

        assert str(exc_info.value) == "NONE does not support API Token authentication"

    def test_init_password_not_supported(self, apply_none_service):
        with pytest.raises(NotImplementedError) as exc_info:
            https.Backend("1.2.3.4:1234", password="pass", service="NONE")

        assert str(exc_info.value) == "NONE does not support password authentication"

    def test_get_tokens_api_token(self):
        backend = https.Backend("1.2.3.4:1234", token_name="name")

        assert backend.get_tokens() == (None, None)

    def test_get_tokens_password(self, mock_pve):
        backend = https.Backend("1.2.3.4:1234", password="name")

        assert ("ticket", "CSRFPreventionToken") == backend.get_tokens()

    def test_verify_ssl_token(self):
        backend = https.Backend("1.2.3.4:1234", token_name="name")
        assert backend.auth.verify_ssl is True

    def test_verify_ssl_false_token(self):
        backend = https.Backend("1.2.3.4:1234", token_name="name", verify_ssl=False)
        assert backend.auth.verify_ssl is False

    def test_verify_ssl_password(self, mock_pve):
        backend = https.Backend("1.2.3.4:1234", password="name")
        assert backend.auth.verify_ssl is True

    def test_verify_ssl_false_password(self, mock_pve):
        backend = https.Backend("1.2.3.4:1234", password="name", verify_ssl=False)
        assert backend.auth.verify_ssl is False


class TestProxmoxHTTPAuthBase:
    """
    Tests the ProxmoxHTTPAuthBase class
    """

    base_url = PVERegistry.base_url

    def test_init_all_args(self):
        auth = https.ProxmoxHTTPAuthBase(timeout=1234, service="PMG", verify_ssl=True)

        assert auth.timeout == 1234
        assert auth.service == "PMG"
        assert auth.verify_ssl is True

    def test_call(self):
        auth = https.ProxmoxHTTPAuthBase()
        req = Request("HEAD", self.base_url + "/version").prepare()
        resp = auth(req)

        assert resp == req

    def test_get_cookies(self):
        auth = https.ProxmoxHTTPAuthBase()

        assert auth.get_cookies().get_dict() == {}


class TestProxmoxHTTPApiTokenAuth:
    """
    Tests the ProxmoxHTTPApiTokenAuth class
    """

    base_url = PVERegistry.base_url

    def test_init_all_args(self):
        auth = https.ProxmoxHTTPApiTokenAuth(
            "user", "name", "value", service="PMG", timeout=1234, verify_ssl=True
        )

        assert auth.username == "user"
        assert auth.token_name == "name"
        assert auth.token_value == "value"
        assert auth.service == "PMG"
        assert auth.timeout == 1234
        assert auth.verify_ssl is True

    def test_call_pve(self):
        auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PVE")
        req = Request("HEAD", self.base_url + "/version").prepare()
        resp = auth(req)

        assert resp.headers["Authorization"] == "PVEAPIToken=user!name=value"

    def test_call_pbs(self):
        auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PBS")
        req = Request("HEAD", self.base_url + "/version").prepare()
        resp = auth(req)

        assert resp.headers["Authorization"] == "PBSAPIToken=user!name:value"


class TestProxmoxHTTPAuth:
    """
    Tests the ProxmoxHTTPApiTokenAuth class
    """

    base_url = PVERegistry.base_url

    # pylint: disable=redefined-outer-name

    def test_init_all_args(self, mock_pve):
        auth = https.ProxmoxHTTPAuth(
            "otp",
            "password",
            otp="otp",
            base_url=self.base_url,
            service="PMG",
            timeout=1234,
            verify_ssl=True,
        )

        assert auth.username == "otp"
        assert auth.pve_auth_ticket == "ticket"
        assert auth.csrf_prevention_token == "CSRFPreventionToken"
        assert auth.service == "PMG"
        assert auth.timeout == 1234
        assert auth.verify_ssl is True

    def test_ticket_renewal(self, mock_pve):
        auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url)

        auth(Request("HEAD", self.base_url + "/version").prepare())

        # check starting auth tokens
        assert auth.pve_auth_ticket == "ticket"
        assert auth.csrf_prevention_token == "CSRFPreventionToken"

        auth.renew_age = 0  # force renewing ticket now
        auth(Request("GET", self.base_url + "/version").prepare())

        # check renewed auth tokens
        assert auth.pve_auth_ticket == "new_ticket"
        assert auth.csrf_prevention_token == "CSRFPreventionToken_2"

    def test_get_cookies(self, mock_pve):
        auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url, service="PVE")

        assert auth.get_cookies().get_dict() == {"PVEAuthCookie": "ticket"}

    def test_auth_failure(self, mock_pve):
        with pytest.raises(core.AuthenticationError) as exc_info:
            https.ProxmoxHTTPAuth("bad_auth", "", base_url=self.base_url)

        assert (
            str(exc_info.value)
            == f"Couldn't authenticate user: bad_auth to {self.base_url}/access/ticket"
        )
        assert (
            repr(exc_info.value)
            == f'AuthenticationError("Couldn\'t authenticate user: bad_auth to {self.base_url}/access/ticket")'
        )

    def test_auth_otp(self, mock_pve):
        https.ProxmoxHTTPAuth(
            "otp", "password", base_url=self.base_url, otp="123456", service="PVE"
        )

    def test_auth_otp_missing(self, mock_pve):
        with pytest.raises(core.AuthenticationError) as exc_info:
            https.ProxmoxHTTPAuth("otp", "password", base_url=self.base_url, service="PVE")

        assert (
            str(exc_info.value)
            == "Couldn't authenticate user: missing Two Factor Authentication (TFA)"
        )
        assert (
            repr(exc_info.value)
            == 'AuthenticationError("Couldn\'t authenticate user: missing Two Factor Authentication (TFA)")'
        )


class TestProxmoxHttpSession:
    """
    Tests the ProxmoxHttpSession class
    """

    base_url = PVERegistry.base_url
    _session = https.Backend("1.2.3.4", token_name="").get_session()

    def test_request_basic(self, mock_pve):
        resp = self._session.request("GET", self.base_url + "/fake/echo")
        content = resp.json()
        assert self._session.cert is None

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/fake/echo"
        assert content["body"] is None
        assert content["headers"]["accept"] == https.JsonSerializer().get_accept_types()

    def test_request_data(self, mock_pve):
        resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"})
        content = resp.json()

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/fake/echo"
        assert content["body"] == "key=value"
        assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"

    def test_request_monitor_command_list(self, mock_pve):
        resp = self._session.request(
            "GET",
            self.base_url + "/nodes/node_name/qemu/100/monitor",
            data={"command": ["info", "block"]},
        )

        assert resp.status_code == 400

    def test_request_exec_command_list(self, mock_pve):
        resp = self._session.request(
            "GET",
            self.base_url + "/nodes/node_name/qemu/100/agent/exec",
            data={"command": ["echo", "hello", "world"]},
        )
        content = resp.json()

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec"
        assert content["body"] == "command=echo&command=hello&command=world"
        assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"

    def test_request_monitor_command_string(self, mock_pve):
        resp = self._session.request(
            "GET",
            self.base_url + "/nodes/node_name/qemu/100/monitor",
            data={"command": "echo hello world"},
        )
        content = resp.json()

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/monitor"
        assert content["body"] == "command=echo+hello+world"
        assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"

    def test_request_exec_command_string(self, mock_pve):
        resp = self._session.request(
            "GET",
            self.base_url + "/nodes/node_name/qemu/100/agent/exec",
            data={"command": "echo hello world"},
        )
        content = resp.json()

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec"
        assert content["body"] == "command=echo&command=hello&command=world"
        assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded"

    def test_request_file(self, mock_pve):
        size = 10
        content = {}
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)
            resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj})
            content = resp.json()

        # decode multipart file
        body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
        m = re.match(body_regex, content["body"])

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/fake/echo"
        assert m is not None  # content matches multipart for the created file
        assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]

    def test_request_streaming(self, toolbelt_on_off, caplog, mock_pve):
        caplog.set_level(logging.INFO, logger=MODULE_LOGGER_NAME)

        size = https.STREAMING_SIZE_THRESHOLD + 1
        content = {}
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)
            resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj})
            content = resp.json()

        # decode multipart file
        body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
        m = re.match(body_regex, content["body"])

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/fake/echo"
        assert m is not None  # content matches multipart for the created file
        assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]

        if not toolbelt_on_off:
            assert caplog.record_tuples == [
                (
                    MODULE_LOGGER_NAME,
                    logging.INFO,
                    "Installing 'requests_toolbelt' will decrease memory used during upload",
                )
            ]

    def test_request_large_file(self, shrink_thresholds, toolbelt_on_off, caplog, mock_pve):
        size = https.SSL_OVERFLOW_THRESHOLD + 1
        content = {}
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)

            if toolbelt_on_off:
                resp = self._session.request(
                    "GET", self.base_url + "/fake/echo", data={"iso": f_obj}
                )
                content = resp.json()

                # decode multipart file
                body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n'
                m = re.match(body_regex, content["body"])

                assert content["method"] == "GET"
                assert content["url"] == self.base_url + "/fake/echo"
                assert m is not None  # content matches multipart for the created file
                assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]

            else:
                # forcing an ImportError
                with pytest.raises(OverflowError) as exc_info:
                    resp = self._session.request(
                        "GET", self.base_url + "/fake/echo", data={"iso": f_obj}
                    )

                assert str(exc_info.value) == "Unable to upload a payload larger than 2 GiB"
                assert caplog.record_tuples == [
                    (
                        MODULE_LOGGER_NAME,
                        logging.WARNING,
                        "Install 'requests_toolbelt' to add support for files larger than 2GiB",
                    )
                ]

    def test_request_filename(self, mock_pve):
        resp = self._session.request(
            "GET",
            self.base_url + "/fake/echo",
            files={"file1": "content"},
            serializer=https.JsonSerializer,
        )
        content = resp.json()

        # decode multipart file
        body_regex = '--([0-9a-f]*)\r\nContent-Disposition: form-data; name="file1"; filename="file1"\r\n\r\ncontent\r\n--\\1--\r\n'
        m = re.match(body_regex, content["body"])

        assert content["method"] == "GET"
        assert content["url"] == self.base_url + "/fake/echo"
        assert m is not None  # content matches multipart for the created file
        assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1]


# pylint: disable=protected-access
class TestJsonSerializer:
    _serializer = https.JsonSerializer()

    def test_get_accept_types(self):
        ctypes = "application/json, application/x-javascript, text/javascript, text/x-javascript, text/x-json"
        assert ctypes == self._serializer.get_accept_types()

    def test_loads_pass(self):
        input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}'
        exp_output = {"key1": "value1", "key2": "value2"}

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads(response)

        assert act_output == exp_output

    def test_loads_not_json(self):
        input_str = "There was an error with the request"
        exp_output = {"errors": b"There was an error with the request"}

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads(response)

        assert act_output == exp_output

    def test_loads_not_unicode(self):
        input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80'
        exp_output = {"errors": input_str.encode("utf-8")}

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads(response)

        assert act_output == exp_output

    def test_loads_errors_pass(self):
        input_str = (
            '{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}'
        )
        exp_output = ["missing required param 1", "missing required param 2"]

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads_errors(response)

        assert act_output == exp_output

    def test_loads_errors_not_json(self):
        input_str = (
            '{"data": {} "errors": ["missing required param 1", "missing required param 2"]}'
        )
        exp_output = {
            "errors": b'{"data": {} "errors": ["missing required param 1", "missing required param 2"]}'
        }

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads_errors(response)

        assert act_output == exp_output

    def test_loads_errors_not_unicode(self):
        input_str = (
            '{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}\x80'
        )
        exp_output = {"errors": input_str.encode("utf-8")}

        response = Response()
        response._content = input_str.encode("utf-8")

        act_output = self._serializer.loads_errors(response)

        assert act_output == exp_output


@pytest.fixture(params=(False, True))
def toolbelt_on_off(request, monkeypatch):
    """
    runs test twice, once with importing of 'requests_toolbelt' to be allowed
    and one with it disabled. Returns True if module is available, False if blocked.
    """
    if not request.param:
        # ran once with requests_toolbelt available and once with it removed
        monkeypatch.setitem(sys.modules, "requests_toolbelt", None)
    return request.param


@pytest.fixture
def shrink_thresholds():
    with mock.patch("proxmoxer.backends.https.STREAMING_SIZE_THRESHOLD", 100), mock.patch(
        "proxmoxer.backends.https.SSL_OVERFLOW_THRESHOLD", 1000
    ):
        yield


@pytest.fixture
def apply_none_service():
    serv = {
        "NONE": {
            "supported_backends": [],
            "supported_https_auths": [],
            "default_port": 1234,
        }
    }

    with mock.patch("proxmoxer.core.SERVICES", serv), mock.patch(
        "proxmoxer.backends.https.SERVICES", serv
    ):
        yield
0707010000002D000081A4000000000000000000000001675E3B1A0000099F000000000000000000000000000000000000002C00000000proxmoxer-2.2.0/tests/test_https_helpers.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import tempfile

from proxmoxer.backends import https


class TestGetFileSize:
    """
    Tests for the get_file_size() function within proxmoxer.backends.https
    """

    def test_empty(self):
        with tempfile.TemporaryFile("w+b") as f_obj:
            assert https.get_file_size(f_obj) == 0

    def test_small(self):
        size = 100
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            assert https.get_file_size(f_obj) == size

    def test_large(self):
        size = 10 * 1024 * 1024  # 10 MB
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            assert https.get_file_size(f_obj) == size

    def test_half_seek(self):
        size = 200
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(int(size / 2))
            assert https.get_file_size(f_obj) == size

    def test_full_seek(self):
        size = 200
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(size)
            assert https.get_file_size(f_obj) == size


class TestGetFileSizePartial:
    """
    Tests for the get_file_size_partial() function within proxmoxer.backends.https
    """

    def test_empty(self):
        with tempfile.TemporaryFile("w+b") as f_obj:
            assert https.get_file_size_partial(f_obj) == 0

    def test_small(self):
        size = 100
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)
            assert https.get_file_size_partial(f_obj) == size

    def test_large(self):
        size = 10 * 1024 * 1024  # 10 MB
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)
            assert https.get_file_size_partial(f_obj) == size

    def test_half_seek(self):
        size = 200
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(int(size / 2))
            assert https.get_file_size_partial(f_obj) == size / 2

    def test_full_seek(self):
        size = 200
        with tempfile.TemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(size)
            assert https.get_file_size_partial(f_obj) == 0
0707010000002E000081A4000000000000000000000001675E3B1A00000FC3000000000000000000000000000000000000002600000000proxmoxer-2.2.0/tests/test_imports.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import logging
import sys
from importlib import reload

import pytest


def test_missing_requests(requests_off, caplog):
    with pytest.raises(SystemExit) as exit_exp:
        import proxmoxer.backends.https as test_https

        # force re-importing of the module with `requests` gone so the validation is triggered
        reload(test_https)

    assert exit_exp.value.code == 1
    assert caplog.record_tuples == [
        (
            "proxmoxer.backends.https",
            logging.ERROR,
            "Chosen backend requires 'requests' module\n",
        )
    ]


def test_missing_requests_tools_files(requests_off, caplog):
    with pytest.raises(SystemExit) as exit_exp:
        import proxmoxer.tools.files as test_files

        # force re-importing of the module with `requests` gone so the validation is triggered
        reload(test_files)

    assert exit_exp.value.code == 1
    assert caplog.record_tuples == [
        (
            "proxmoxer.tools.files",
            logging.ERROR,
            "Files tools requires 'requests' module\n",
        )
    ]


def test_missing_openssh_wrapper(openssh_off, caplog):
    with pytest.raises(SystemExit) as exit_exp:
        import proxmoxer.backends.openssh as test_openssh

        # force re-importing of the module with `openssh_wrapper` gone so the validation is triggered
        reload(test_openssh)

    assert exit_exp.value.code == 1
    assert caplog.record_tuples == [
        (
            "proxmoxer.backends.openssh",
            logging.ERROR,
            "Chosen backend requires 'openssh_wrapper' module\n",
        )
    ]


def test_missing_paramiko_off(paramiko_off, caplog):
    with pytest.raises(SystemExit) as exit_exp:
        import proxmoxer.backends.ssh_paramiko as ssh_paramiko

        # force re-importing of the module with `ssh_paramiko` gone so the validation is triggered
        reload(ssh_paramiko)

    assert exit_exp.value.code == 1
    assert caplog.record_tuples == [
        (
            "proxmoxer.backends.ssh_paramiko",
            logging.ERROR,
            "Chosen backend requires 'paramiko' module\n",
        )
    ]


class TestCommandBase:
    def test_join_empty(self, shlex_join_on_off):
        from proxmoxer.backends import command_base

        reload(command_base)

        arr = []

        assert command_base.shell_join(arr) == ""

    def test_join_single(self, shlex_join_on_off):
        from proxmoxer.backends import command_base

        reload(command_base)

        arr = ["echo"]

        assert command_base.shell_join(arr) == "echo"

    def test_join_multiple(self, shlex_join_on_off):
        from proxmoxer.backends import command_base

        reload(command_base)

        arr = ["echo", "test"]

        assert command_base.shell_join(arr) == "echo test"

    def test_join_complex(self, shlex_join_on_off):
        from proxmoxer.backends import command_base

        reload(command_base)

        arr = ["echo", 'hello "world"']

        assert command_base.shell_join(arr) == "echo 'hello \"world\"'"


@pytest.fixture()
def requests_off(monkeypatch):
    return monkeypatch.setitem(sys.modules, "requests", None)


@pytest.fixture()
def openssh_off(monkeypatch):
    return monkeypatch.setitem(sys.modules, "openssh_wrapper", None)


@pytest.fixture()
def paramiko_off(monkeypatch):
    return monkeypatch.setitem(sys.modules, "paramiko", None)


@pytest.fixture(params=(False, True))
def shlex_join_on_off(request, monkeypatch):
    """
    runs test twice, once with importing of 'shlex.join' to be allowed
    and one with it disabled. Returns True if module is available, False if blocked.
    """

    if not request.param:
        # ran once with shlex available and once with it removed
        if getattr(sys.modules["shlex"], "join", None):
            monkeypatch.delattr(sys.modules["shlex"], "join")
        # else join already does not exist (py < 3.8)
    return request.param
0707010000002F000081A4000000000000000000000001675E3B1A00000635000000000000000000000000000000000000002400000000proxmoxer-2.2.0/tests/test_local.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import tempfile

from proxmoxer.backends import local

# pylint: disable=no-self-use


class TestLocalBackend:
    def test_init(self):
        back = local.Backend()

        assert isinstance(back.session, local.LocalSession)
        assert back.target == "localhost"


class TestLocalSession:
    _session = local.LocalSession()

    def test_upload_file_obj(self):
        size = 100

        with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile(
            "rb"
        ) as dest_obj:
            f_obj.write(b"a" * size)
            f_obj.seek(0)
            self._session.upload_file_obj(f_obj, dest_obj.name)

            # reset file cursor to start of file after copy
            f_obj.seek(0)

            assert f_obj.read() == dest_obj.read()

    def test_upload_file_obj_end(self):
        size = 100

        with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile(
            "rb"
        ) as dest_obj:
            f_obj.write(b"a" * size)
            # do not seek to start of file before copy
            self._session.upload_file_obj(f_obj, dest_obj.name)

            assert b"" == dest_obj.read()

    def test_exec(self):
        cmd = [
            "python3",
            "-c",
            'import sys; sys.stdout.write("stdout content"); sys.stderr.write("stderr content")',
        ]

        stdout, stderr = self._session._exec(cmd)

        assert stdout == "stdout content"
        assert stderr == "stderr content"
07070100000030000081A4000000000000000000000001675E3B1A000009F1000000000000000000000000000000000000002600000000proxmoxer-2.2.0/tests/test_openssh.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import tempfile
from unittest import mock

import openssh_wrapper
import pytest

from proxmoxer.backends import openssh

# pylint: disable=no-self-use


class TestOpenSSHBackend:
    def test_init(self):
        back = openssh.Backend("host", "user")

        assert isinstance(back.session, openssh.OpenSSHSession)
        assert back.session.host == "host"
        assert back.session.user == "user"
        assert back.target == "host"


class TestOpenSSHSession:
    _session = openssh.OpenSSHSession("host", "user")

    def test_init_all_args(self):
        with tempfile.NamedTemporaryFile("r") as conf_obj, tempfile.NamedTemporaryFile(
            "r"
        ) as ident_obj:
            sess = openssh.OpenSSHSession(
                "host",
                "user",
                config_file=conf_obj.name,
                port=123,
                identity_file=ident_obj.name,
                forward_ssh_agent=True,
            )

            assert sess.host == "host"
            assert sess.user == "user"
            assert sess.config_file == conf_obj.name
            assert sess.port == 123
            assert sess.identity_file == ident_obj.name
            assert sess.forward_ssh_agent is True

    def test_exec(self, mock_session):
        cmd = [
            "echo",
            "hello",
            "world",
        ]

        stdout, stderr = mock_session._exec(cmd)

        assert stdout == "stdout content"
        assert stderr == "stderr content"
        mock_session.ssh_client.run.assert_called_once_with(
            "echo hello world",
            forward_ssh_agent=True,
        )

    def test_upload_file_obj(self, mock_session):
        with tempfile.NamedTemporaryFile("r") as f_obj:
            mock_session.upload_file_obj(f_obj, "/tmp/file")

            mock_session.ssh_client.scp.assert_called_once_with(
                (f_obj,),
                target="/tmp/file",
            )


@pytest.fixture
def mock_session():
    with mock.patch("proxmoxer.backends.openssh.OpenSSHSession._connect", _get_mock_ssh_conn):
        yield openssh.OpenSSHSession("host", "user", forward_ssh_agent=True)


def _get_mock_ssh_conn(_):
    ssh_conn = mock.Mock(spec=openssh_wrapper.SSHConnection)

    ssh_conn.run = mock.Mock(
        # spec=openssh_wrapper.SSHConnection.run,
        return_value=mock.Mock(stdout="stdout content", stderr="stderr content"),
    )

    ssh_conn.scp = mock.Mock()

    return ssh_conn
07070100000031000081A4000000000000000000000001675E3B1A0000144B000000000000000000000000000000000000002700000000proxmoxer-2.2.0/tests/test_paramiko.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import os.path
import tempfile
from unittest import mock

import pytest

from proxmoxer.backends import ssh_paramiko

# pylint: disable=no-self-use


class TestParamikoBackend:
    def test_init(self, mock_connect):
        back = ssh_paramiko.Backend("host", "user")

        assert isinstance(back.session, ssh_paramiko.SshParamikoSession)
        assert back.session.host == "host"
        assert back.session.user == "user"
        assert back.target == "host"


class TestSshParamikoSession:
    def test_init_all_args(self, mock_connect):
        sess = ssh_paramiko.SshParamikoSession(
            "host", "user", password="password", private_key_file="/tmp/key_file", port=1234
        )

        assert sess.host == "host"
        assert sess.user == "user"
        assert sess.password == "password"
        assert sess.private_key_file == "/tmp/key_file"
        assert sess.port == 1234
        assert sess.ssh_client == mock_connect()

    def test_connect_basic(self, mock_ssh_client):
        import paramiko

        sess = ssh_paramiko.SshParamikoSession("host", "user", password="password", port=1234)

        sess.ssh_client.connect.assert_called_once_with(
            "host",
            username="user",
            allow_agent=False,
            look_for_keys=True,
            key_filename=None,
            password="password",
            timeout=5,
            port=1234,
        )
        policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
        assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)

    def test_connect_key_file(self, mock_ssh_client):
        import paramiko

        sess = ssh_paramiko.SshParamikoSession(
            "host", "user", password="password", private_key_file="/tmp/key_file", port=1234
        )

        sess.ssh_client.connect.assert_called_once_with(
            "host",
            username="user",
            allow_agent=False,
            look_for_keys=True,
            key_filename="/tmp/key_file",
            password="password",
            timeout=5,
            port=1234,
        )
        policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
        assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)

    def test_connect_key_file_user(self, mock_ssh_client):
        import paramiko

        sess = ssh_paramiko.SshParamikoSession(
            "host", "user", password="password", private_key_file="~/key_file", port=1234
        )

        sess.ssh_client.connect.assert_called_once_with(
            "host",
            username="user",
            allow_agent=False,
            look_for_keys=True,
            key_filename=os.path.expanduser("~") + "/key_file",
            password="password",
            timeout=5,
            port=1234,
        )
        policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0]
        assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy)

    def test_exec(self, mock_ssh_client):
        mock_client, mock_session, _ = mock_ssh_client

        sess = ssh_paramiko.SshParamikoSession("host", "user")
        sess.ssh_client = mock_client

        stdout, stderr = sess._exec(["echo", "hello", "world"])

        assert stdout == "stdout contents"
        assert stderr == "stderr contents"
        mock_session.exec_command.assert_called_once_with("echo hello world")

    def test_upload_file_obj(self, mock_ssh_client):
        mock_client, _, mock_sftp = mock_ssh_client

        sess = ssh_paramiko.SshParamikoSession("host", "user")
        sess.ssh_client = mock_client

        with tempfile.NamedTemporaryFile("r") as f_obj:
            sess.upload_file_obj(f_obj, "/tmp/file")

            mock_sftp.putfo.assert_called_once_with(f_obj, "/tmp/file")

        mock_sftp.close.assert_called_once_with()


@pytest.fixture
def mock_connect():
    m = mock.Mock(spec=ssh_paramiko.SshParamikoSession._connect)
    with mock.patch(
        "proxmoxer.backends.ssh_paramiko.SshParamikoSession._connect",
        m,
    ):
        yield m


@pytest.fixture
def mock_ssh_client():
    # pylint: disable=import-outside-toplevel
    from paramiko import SFTPClient, SSHClient, Transport, channel

    mock_client = mock.Mock(spec=SSHClient)
    mock_transport = mock.Mock(spec=Transport)
    mock_channel = mock.Mock(spec=channel.Channel)
    mock_sftp = mock.Mock(spec=SFTPClient)

    # mock the return streams from the SSH connection
    mock_stdout = mock.Mock(spec=channel.ChannelFile)
    mock_stderr = mock.Mock(spec=channel.ChannelStderrFile)
    mock_stdout.read.return_value = b"stdout contents"
    mock_stderr.read.return_value = b"stderr contents"
    mock_channel.makefile.return_value = mock_stdout
    mock_channel.makefile_stderr.return_value = mock_stderr

    mock_transport.open_session.return_value = mock_channel
    mock_client.get_transport.return_value = mock_transport
    mock_client.open_sftp.return_value = mock_sftp

    with mock.patch("paramiko.SSHClient", mock_client):
        yield (mock_client, mock_channel, mock_sftp)
07070100000032000041ED000000000000000000000002675E3B1A00000000000000000000000000000000000000000000001C00000000proxmoxer-2.2.0/tests/tools07070100000033000081A4000000000000000000000001675E3B1A0000005C000000000000000000000000000000000000002800000000proxmoxer-2.2.0/tests/tools/__init__.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
07070100000034000081A4000000000000000000000001675E3B1A00003525000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/tests/tools/test_files.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"

import logging
import tempfile
from unittest import mock

import pytest

from proxmoxer import ProxmoxAPI, core
from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums

from ..api_mock import mock_pve  # pylint: disable=unused-import # noqa: F401
from ..files_mock import (  # pylint: disable=unused-import # noqa: F401
    mock_files,
    mock_files_and_pve,
)

MODULE_LOGGER_NAME = "proxmoxer.tools.files"


class TestChecksumInfo:
    def test_basic(self):
        info = ChecksumInfo("name", 123)

        assert info.name == "name"
        assert info.hex_size == 123

    def test_str(self):
        info = ChecksumInfo("name", 123)

        assert str(info) == "name"

    def test_repr(self):
        info = ChecksumInfo("name", 123)

        assert repr(info) == "name (123 digits)"


class TestGetChecksum:
    def test_get_checksum_from_sibling_file_success(self, mock_files):
        url = "https://sub.domain.tld/sibling/file.iso"
        exp_hash = "this_is_the_hash"
        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
        res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso")

        assert res1 == exp_hash
        assert res2 == exp_hash

    def test_get_checksum_from_sibling_file_fail(self, mock_files):
        url = "https://sub.domain.tld/sibling/missing.iso"
        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
        res2 = Files._get_checksum_from_sibling_file(
            url, checksum_info=info, filename="missing.iso"
        )

        assert res1 is None
        assert res2 is None

    def test_get_checksum_from_extension_success(self, mock_files):
        url = "https://sub.domain.tld/extension/file.iso"
        exp_hash = "this_is_the_hash"
        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_extension(url, checksum_info=info)
        res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso")

        assert res1 == exp_hash
        assert res2 == exp_hash

    def test_get_checksum_from_extension_fail(self, mock_files):
        url = "https://sub.domain.tld/extension/missing.iso"

        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_extension(url, checksum_info=info)
        res2 = Files._get_checksum_from_extension(
            url, checksum_info=info, filename="connectionerror.iso"
        )
        res3 = Files._get_checksum_from_extension(
            url, checksum_info=info, filename="readtimeout.iso"
        )

        assert res1 is None
        assert res2 is None
        assert res3 is None

    def test_get_checksum_from_extension_upper_success(self, mock_files):
        url = "https://sub.domain.tld/upper/file.iso"
        exp_hash = "this_is_the_hash"
        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
        res2 = Files._get_checksum_from_extension_upper(
            url, checksum_info=info, filename="file.iso"
        )

        assert res1 == exp_hash
        assert res2 == exp_hash

    def test_get_checksum_from_extension_upper_fail(self, mock_files):
        url = "https://sub.domain.tld/upper/missing.iso"
        info = ChecksumInfo("testing", 16)
        res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
        res2 = Files._get_checksum_from_extension_upper(
            url, checksum_info=info, filename="missing.iso"
        )

        assert res1 is None
        assert res2 is None

    def test_get_checksums_from_file_url_all_checksums(self, mock_files):
        base_url = "https://sub.domain.tld/checksums/file.iso"
        full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
        for types_enum in SupportedChecksums:
            checksum_info = types_enum.value

            data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info)

            assert data[0] == full_checksum_string[0 : checksum_info.hex_size]
            assert data[1] == checksum_info

    def test_get_checksums_from_file_url_missing(self, mock_files):
        url = "https://sub.domain.tld/missing.iso"

        data = Files.get_checksums_from_file_url(url)

        assert data[0] is None
        assert data[1] is None


class TestFiles:
    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")

    def test_init_basic(self):
        f = Files(self.prox, "node1", "storage1")

        assert f._prox == self.prox
        assert f._node == "node1"
        assert f._storage == "storage1"

    def test_repr(self):
        f = Files(self.prox, "node1", "storage1")
        assert (
            repr(f)
            == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))"
        )

    def test_get_file_info_pass(self, mock_pve):
        f = Files(self.prox, "node1", "storage1")
        info = f.get_file_info("https://sub.domain.tld/file.iso")

        assert info["filename"] == "file.iso"
        assert info["mimetype"] == "application/x-iso9660-image"
        assert info["size"] == 123456

    def test_get_file_info_fail(self, mock_pve):
        f = Files(self.prox, "node1", "storage1")
        info = f.get_file_info("https://sub.domain.tld/invalid.iso")

        assert info is None


class TestFilesDownload:
    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
    f = Files(prox, "node1", "storage1")

    def test_download_discover_checksum(self, mock_files_and_pve, caplog):
        status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso")

        # this is the default "done" task mock information
        assert status == {
            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
            "starttime": 1661825068,
            "user": "root@pam",
            "type": "vzdump",
            "pstart": 284768076,
            "status": "stopped",
            "exitstatus": "OK",
            "pid": 1044989,
            "id": "110",
            "node": "node1",
        }
        assert caplog.record_tuples == []

    def test_download_no_blocking(self, mock_files_and_pve, caplog):
        status = self.f.download_file_to_storage(
            "https://sub.domain.tld/checksums/file.iso", blocking_status=False
        )

        # this is the default "done" task mock information
        assert status == {
            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
            "starttime": 1661825068,
            "user": "root@pam",
            "type": "vzdump",
            "pstart": 284768076,
            "status": "stopped",
            "exitstatus": "OK",
            "pid": 1044989,
            "id": "110",
            "node": "node1",
        }
        assert caplog.record_tuples == []

    def test_download_no_discover_checksum(self, mock_files_and_pve, caplog):
        caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME)

        status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso")

        # this is the default "stopped" task mock information
        assert status == {
            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
            "starttime": 1661825068,
            "user": "root@pam",
            "type": "vzdump",
            "pstart": 284768076,
            "status": "stopped",
            "exitstatus": "OK",
            "pid": 1044989,
            "id": "110",
            "node": "node1",
        }
        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.WARNING,
                "Unable to discover checksum. Will not do checksum validation",
            ),
        ]

    def test_uneven_checksum(self, caplog, mock_files_and_pve):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
        status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf")

        assert status is None

        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.ERROR,
                "Must pass both checksum and checksum_type or leave both None for auto-discovery",
            ),
        ]

    def test_uneven_checksum_type(self, caplog, mock_files_and_pve):
        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
        status = self.f.download_file_to_storage(
            "https://sub.domain.tld/file.iso", checksum_type="asdf"
        )

        assert status is None

        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.ERROR,
                "Must pass both checksum and checksum_type or leave both None for auto-discovery",
            ),
        ]

    def test_get_file_info_missing(self, mock_pve):
        f = Files(self.prox, "node1", "storage1")
        info = f.get_file_info("https://sub.domain.tld/missing.iso")

        assert info is None

    def test_get_file_info_non_iso(self, mock_pve):
        f = Files(self.prox, "node1", "storage1")
        info = f.get_file_info("https://sub.domain.tld/index.html")

        assert info["filename"] == "index.html"
        assert info["mimetype"] == "text/html"


class TestFilesUpload:
    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
    f = Files(prox, "node1", "storage1")

    def test_upload_no_file(self, mock_files_and_pve, caplog):
        status = self.f.upload_local_file_to_storage("/does-not-exist.iso")

        assert status is None
        assert caplog.record_tuples == [
            (
                MODULE_LOGGER_NAME,
                logging.ERROR,
                '"/does-not-exist.iso" does not exist or is not a file',
            ),
        ]

    def test_upload_dir(self, mock_files_and_pve, caplog):
        with tempfile.TemporaryDirectory() as tmp_dir:
            status = self.f.upload_local_file_to_storage(tmp_dir)

            assert status is None
            assert caplog.record_tuples == [
                (
                    MODULE_LOGGER_NAME,
                    logging.ERROR,
                    f'"{tmp_dir}" does not exist or is not a file',
                ),
            ]

    def test_upload_empty_file(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            status = self.f.upload_local_file_to_storage(filename=f_obj.name)

            assert status is not None
            assert caplog.record_tuples == []

    def test_upload_non_empty_file(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("w+b") as f_obj:
            f_obj.write(b"a" * 100)
            f_obj.seek(0)
            status = self.f.upload_local_file_to_storage(filename=f_obj.name)

            assert status is not None
            assert caplog.record_tuples == []

    def test_upload_no_checksum(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            status = self.f.upload_local_file_to_storage(
                filename=f_obj.name, do_checksum_check=False
            )

            assert status is not None
            assert caplog.record_tuples == []

    def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            status = self.f.upload_local_file_to_storage(filename=f_obj.name)

            assert status is not None
            assert caplog.record_tuples == [
                (
                    MODULE_LOGGER_NAME,
                    logging.WARNING,
                    "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation",
                )
            ]

    def test_upload_non_blocking(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False)

            assert status is not None
            assert caplog.record_tuples == []

    def test_upload_proxmox_error(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            f_copy = Files(self.f._prox, self.f._node, "missing")

            with pytest.raises(core.ResourceException) as exc_info:
                f_copy.upload_local_file_to_storage(filename=f_obj.name)

            assert exc_info.value.status_code == 500
            assert exc_info.value.status_message == "Internal Server Error"
            # assert exc_info.value.content == "storage 'missing' does not exist"

    def test_upload_io_error(self, mock_files_and_pve, caplog):
        with tempfile.NamedTemporaryFile("rb") as f_obj:
            mo = mock.mock_open()
            mo.side_effect = IOError("ERROR MESSAGE")
            with mock.patch("builtins.open", mo):
                status = self.f.upload_local_file_to_storage(filename=f_obj.name)

            assert status is None
            assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")]


@pytest.fixture
def apply_no_checksums():
    with mock.patch("hashlib.algorithms_available", set()):
        yield
07070100000035000081A4000000000000000000000001675E3B1A000022B1000000000000000000000000000000000000002A00000000proxmoxer-2.2.0/tests/tools/test_tasks.py__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"

import logging

import pytest

from proxmoxer import ProxmoxAPI
from proxmoxer.tools import Tasks

from ..api_mock import mock_pve  # pylint: disable=unused-import # noqa: F401


class TestBlockingStatus:
    def test_basic(self, mocked_prox, caplog):
        caplog.set_level(logging.DEBUG, logger="proxmoxer.core")

        status = Tasks.blocking_status(
            mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done"
        )

        assert status == {
            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
            "starttime": 1661825068,
            "user": "root@pam",
            "type": "vzdump",
            "pstart": 284768076,
            "status": "stopped",
            "exitstatus": "OK",
            "pid": 1044989,
            "id": "110",
            "node": "node1",
        }
        assert caplog.record_tuples == [
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'',
            ),
        ]

    def test_zeroed(self, mocked_prox, caplog):
        caplog.set_level(logging.DEBUG, logger="proxmoxer.core")

        status = Tasks.blocking_status(
            mocked_prox, "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment"
        )

        assert status == {
            "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
            "node": "node",
            "pid": 0,
            "pstart": 0,
            "starttime": 0,
            "type": "task",
            "id": "id",
            "user": "root@pam",
            "status": "stopped",
            "exitstatus": "OK",
        }
        assert caplog.record_tuples == [
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node/tasks/UPID:node:00000000:00000000:00000000:task:id:root@pam:comment/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK"}}\'',
            ),
        ]

    def test_killed(self, mocked_prox, caplog):
        caplog.set_level(logging.DEBUG, logger="proxmoxer.core")

        status = Tasks.blocking_status(
            mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped"
        )

        assert status == {
            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
            "starttime": 1661825068,
            "user": "root@pam",
            "type": "vzdump",
            "pstart": 284768076,
            "status": "stopped",
            "exitstatus": "interrupted by signal",
            "pid": 1044989,
            "id": "110",
            "node": "node1",
        }
        assert caplog.record_tuples == [
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1"}}\'',
            ),
        ]

    def test_timeout(self, mocked_prox, caplog):
        caplog.set_level(logging.DEBUG, logger="proxmoxer.core")

        status = Tasks.blocking_status(
            mocked_prox,
            "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
            timeout=0.021,
            polling_interval=0.01,
        )

        assert status is None
        assert caplog.record_tuples == [
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
            ),
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
            ),
            (
                "proxmoxer.core",
                20,
                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
            ),
            (
                "proxmoxer.core",
                10,
                'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
            ),
        ]


class TestDecodeUpid:
    def test_basic(self):
        upid = "UPID:node:000CFC5C:03E8D0C3:6194806C:aptupdate::root@pam:"
        decoded = Tasks.decode_upid(upid)

        assert decoded["upid"] == upid
        assert decoded["node"] == "node"
        assert decoded["pid"] == 851036
        assert decoded["pstart"] == 65589443
        assert decoded["starttime"] == 1637122156
        assert decoded["type"] == "aptupdate"
        assert decoded["id"] == ""
        assert decoded["user"] == "root@pam"
        assert decoded["comment"] == ""

    def test_all_values(self):
        upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:local"
        decoded = Tasks.decode_upid(upid)

        assert decoded["upid"] == upid
        assert decoded["node"] == "node1"
        assert decoded["pid"] == 851962
        assert decoded["pstart"] == 65597267
        assert decoded["starttime"] == 1637122234
        assert decoded["type"] == "vzdump"
        assert decoded["id"] == "103"
        assert decoded["user"] == "root@pam"
        assert decoded["comment"] == "local"

    def test_invalid_length(self):
        upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam"
        with pytest.raises(AssertionError) as exc_info:
            Tasks.decode_upid(upid)

        assert str(exc_info.value) == "UPID is not in the correct format"

    def test_invalid_start(self):
        upid = "ASDF:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:"
        with pytest.raises(AssertionError) as exc_info:
            Tasks.decode_upid(upid)

        assert str(exc_info.value) == "UPID is not in the correct format"


class TestDecodeLog:
    def test_basic(self):
        log_list = [{"n": 1, "t": "client connection: 127.0.0.1:49608"}, {"t": "TASK OK", "n": 2}]
        log_str = Tasks.decode_log(log_list)

        assert log_str == "client connection: 127.0.0.1:49608\nTASK OK"

    def test_empty(self):
        log_list = []
        log_str = Tasks.decode_log(log_list)

        assert log_str == ""

    def test_unordered(self):
        log_list = [{"n": 3, "t": "third"}, {"t": "first", "n": 1}, {"t": "second", "n": 2}]
        log_str = Tasks.decode_log(log_list)

        assert log_str == "first\nsecond\nthird"


@pytest.fixture
def mocked_prox(mock_pve):
    return ProxmoxAPI("1.2.3.4:1234", user="user", password="password")
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!384 blocks
openSUSE Build Service is sponsored by