File orthos2-1.2.31+git.5a38e82.obscpio of Package orthos-client
07070100000000000081ED00000000000000000000000161D832210000B64B000000000000000000000000000000000000002300000000orthos2-1.2.31+git.5a38e82/orthos2#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Author Jan Löser <jloeser@suse.de>
# Published under the GNU Public Licence 2
import argparse
import atexit
import fcntl
import getpass
import json
import logging
import os
import pwd
import re
import readline
import struct
import subprocess
import sys
import termios
from datetime import date, datetime
import pytz
if sys.version_info.major == 3:
import urllib.request as urllib_request
from configparser import RawConfigParser
from urllib.parse import urlencode
PYTHON_VERSION = 3
else:
print("Python version not supported!")
sys.exit(1)
VERSION = '2.0.0'
DEFAULT_PROTOCOL = 'https'
DEFAULT_SERVERNAME = 'localhost'
DEFAULT_PORT = '443'
API_URL_FORMAT = '{0}://{1}:{2}/api'
USER_CONFIGFILE = '~/.config/orthosrc'
SYSTEM_CONFIGFILE = '/etc/orthosrc'
USER_HISTORYFILE = '~/.orthos_history'
PROMPT = '(orthos {0}:{1})'
PROMPT_LOADING = 'Please wait...'
LEFT_MARGIN = 30
COLUMN_PADDING = 2
DEBUG = False
UTF8 = True
TIME_ZONE = 'Europe/Berlin'
PLAIN_OUTPUT = False
IFS = os.environ.get('OIFS', '')
ALIASES = {}
DATETIME_FORMAT = '%Y-%m-%d %H:%M'
DATETIME_INPUT_FORMATS = [
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ'
]
orthos = None
class Type:
INFO = 'INFO'
TABLE = 'TABLE'
SELECT = 'SELECT'
MESSAGE = 'MESSAGE'
AUTHREQUIRED = 'AUTHREQUIRED'
INPUT = 'INPUT'
class InputType:
INTEGER = 'INTEGER'
STRING = 'STRING'
BOOLEAN = 'BOOLEAN'
DATE = 'DATE'
SELECTION = 'SELECTION'
class HTTPStatus:
OK = 200
BADREQUEST = 400
NOTFOUND = 404
def format_value(value):
"""Format values human readable.
Examples:
Python type | Value | UTF8 | !UTF8 | Plain output ('-p')
------------|--------------|------------|------------|--------------------
str | '' | '-' | '-' | ''
bool | True | '✓' | 'yes' | 'true'
bool | False | 'x' | 'no' | 'false'
(NoneType) | None | '-' | '-' | 'none'
"""
if (value is None):
if PLAIN_OUTPUT:
return 'none'
return '-'
# Python2: check for 'unicode' here
if isinstance(value, str) or type(value).__name__ == 'unicode':
if PLAIN_OUTPUT:
return value
# Expected formats:
# a) `2017-08-09T11:22:33.123456Z` (with microseconds)
# b) `2017-08-09T11:22:33Z`
# `9999-12-31T00:00:00Z` (=infinte)
datetime_value = None
try:
datetime_value = datetime.strptime(value, DATETIME_INPUT_FORMATS[0])
except ValueError:
try:
datetime_value = datetime.strptime(value, DATETIME_INPUT_FORMATS[1])
except ValueError:
pass
finally:
if datetime_value:
if datetime_value.date() == date.max:
return 'infinite'
try:
# make tz aware (UTC), return as local time
datetime_value = datetime_value.replace(tzinfo=pytz.UTC)
target_timezone = pytz.timezone(orthos.config.get_timezone())
return datetime_value.astimezone(target_timezone).strftime(DATETIME_FORMAT)
except OverflowError:
pass
if (not value):
return '-'
if isinstance(value, bool):
if PLAIN_OUTPUT:
return str(value).lower()
elif value and UTF8:
return '✓'
elif value and (not UTF8):
return 'yes'
elif (not value) and UTF8:
return 'x'
else:
return 'no'
return value
def get_prompt(user):
return PROMPT.format(VERSION, user)
class Command:
def __init__(self, *args):
self._command = args[0]
self._help = args[1].get('help', None)
self._docstring = args[1].get('docstring', None)
self._tabcompletion = args[1].get('tabcompletion', None)
self._url = args[1].get('url', None)
self._method = args[1].get('method', 'GET')
self._recent_arguments = ''
self._argument_lists = args[1].get('arguments', None)
def __repr__(self):
return '<Command: {0}>'.format(str(self._command))
def __str__(self):
return self._command.lower()
def __lt__(self, other):
return sorted([str(self), str(other)])
def __eq__(self, other):
return str(self) == other.lower()
def get_tabcompletion(self):
return self._tabcompletion
def get_help(self):
return self._help if self._help else '-'
def get_docstring(self):
return self._docstring if self._docstring else 'No help available!'
def get_url(self):
return self._url
def get_method(self):
return self._method
def get_matching_argument_list(self, length):
"""Return matching argument list.
Return the respective argument list according to the number of given arguments, None
otherwise. A trailing asterisk indicates a raw argument transfer.
"""
if self._argument_lists is not None:
for argument_list in self._argument_lists:
if len(argument_list) == 1 and argument_list[0][-1:] == '*':
return argument_list
elif len(argument_list) == length:
return argument_list
return None
def get_argument_lists(self):
return self._argument_lists
def get_name(self):
return self._command
def send(self, orthos, raw_arguments=''):
"""Send a requst according to the command's HTTP method (set by the server).
For re-executing a command with its arguments (e.g. after authorization was requested), all
arguments get saved in the command object here.
Example:
'command foo bar baz' -> http://.../command/foo?data=bar+baz
'command foo' -> http://.../command/foo
'command' -> http://.../command/
For POST requests, the following arguments get forwarded as entered into the API's post
method.
"""
url = API_URL + self._url
self._recent_arguments = raw_arguments.strip().split()
matching_argument_list = self.get_matching_argument_list(len(self._recent_arguments))
if self._method == 'POST':
if matching_argument_list and matching_argument_list[0][-1:] == '*':
arguments = {matching_argument_list[0][:-1]: raw_arguments}
else:
arguments = dict(zip(matching_argument_list, self._recent_arguments))
return orthos.API.request('POST', url, data=arguments)
elif self._method == 'GET':
if matching_argument_list and matching_argument_list[0][-1:] == '*':
arguments = urlencode({matching_argument_list[0][:-1]: raw_arguments})
else:
arguments = urlencode(dict(zip(matching_argument_list, self._recent_arguments)))
if arguments:
url += '?{}'.format(arguments)
return orthos.API.request('GET', url)
def as_input(self):
"""Return the command with its arguments as a list."""
return [str(self)] + self._recent_arguments
class APIResponse:
def __init__(self, header, data):
self._header = header
self._data = data
self._type = self._header['type']
logging.debug(self._header)
logging.debug(self._data)
if self._type == Type.SELECT:
self.__class__ = Select
elif self._type == Type.INFO:
self.__class__ = Info
self.__init__()
elif self._type == Type.TABLE:
self.__class__ = Table
self.__init__()
elif self._type == Type.MESSAGE:
self.__class__ = Message
elif self._type == Type.AUTHREQUIRED:
self.__class__ = AuthRequired
elif self._type == Type.INPUT:
self.__class__ = Input
self.__init__()
@property
def text(self):
return self.output()
class Input(APIResponse):
def __init__(self):
self._order = self._header['order']
self._target_url = self._header['target']
def _clean_value(self, field, input_value, required, items=None):
"""Validate and cast input values.
Value needs to be the required type and needs to be set
if value is required at all. For selections, either the number or the given is returned.
Raises ValueError if value is corrupted.
"""
input_type = field['type'].upper()
value = None
if required and not input_value:
raise ValueError("Value is required!")
if input_type == InputType.INTEGER:
if input_value.strip() == '':
value = None
else:
try:
value = int(input_value)
except ValueError:
raise ValueError("'{0}' is not an integer!".format(str(input_value)))
max_value = field.get('max', None)
min_value = field.get('min', None)
if (max_value is not None) and (value > max_value):
raise ValueError("Value must be equal less than {0}.".format(max_value))
if (min_value is not None) and (value < min_value):
raise ValueError("Value must be equal greater than {0}.".format(min_value))
elif input_type == InputType.SELECTION:
try:
value = int(input_value)
except ValueError:
raise ValueError("'{0}' is not a number!".format(str(input_value)))
if not (value >= 0 and value < len(items)):
raise ValueError("Selection out of range!")
item_values = []
for item in items:
item_name = list(item.keys())[0]
item_value = item[item_name].get('value', None)
item_values.append(item_value)
value = item_values[value]
elif input_type == InputType.STRING:
value = input_value
elif input_type == InputType.DATE:
value = input_value
elif input_type == InputType.BOOLEAN:
if input_value.lower() in {'1', 'y', 'yes', 'true'}:
value = True
elif input_value.lower() in {'0', 'n', 'no', 'false'}:
value = False
else:
raise ValueError("Value {0} is not a boolean; use 'y' or 'n'!".format(str(input_value)))
return value
def process(self):
"""Execute API result."""
data = {}
i = 0
while i < len(self._order):
field_name = self._order[i]
field = self._data[field_name]
field_type = field['type']
item_values = {}
# print selection
if field_type == InputType.SELECTION:
j = 0
for item in field['items']:
item_name = list(item.keys())[0]
item_attributes = item[item_name]
item_value = item_attributes.get('value', None)
if item_value:
item_values[item_name] = item_value
print('{0:>3}) {1:<30}'.format(j, item_attributes['label']))
j += 1
prompt = '{0}>'.format(field['prompt'])
required = field['required']
hidden = field.get('hidden', None)
logging.debug(field)
if hidden:
input_ = field.get('initial', None)
else:
try:
input_ = orthos.LineReader.readline(
prompt=prompt,
default=field.get('initial', None),
history=False
)
input_ = self._clean_value(
field,
input_,
required,
items=field.get('items', None)
)
except (KeyboardInterrupt, EOFError):
print("\nAborted by user!")
return None
except ValueError as e:
print("ERROR: {0}".format(str(e)))
continue
data[field_name] = input_
i = i + 1
response = orthos.API.request('POST', API_URL + self._target_url, {'form': data})
if response[0]:
apiresponse = APIResponse(*response)
apiresponse.process()
class AuthRequired(APIResponse):
def process(self):
"""Execute API result."""
if not orthos.is_authenticated():
orthos.authenticate()
return orthos.recent_command
class Select(APIResponse):
def output(self):
"""Generate output."""
result = '{0}\n\n'.format(self._header['title'])
i = 0
for entry in self._data:
result += '{0:>3}) {1:<30}\n'.format(i, entry['value'])
i += 1
return result
def process(self):
"""Execute API result."""
orthos.Terminal.show(self.output())
class Message(APIResponse):
def output(self):
"""Generate output."""
if not self._data.get('type', None) or not self._data['type']:
result = '{0}'.format(self._data['message'])
else:
result = '{0}: {1}'.format(self._data['type'], self._data['message'])
return result
def process(self):
"""Execute API result."""
if (orthos.config.is_quiet() and self._data['type'] == 'INFO'):
return
orthos.Terminal.show(self.output())
class Table(APIResponse):
def __init__(self):
self._theader = self._header['theader']
self._captions = [list(item.values())[0] for item in self._theader]
self._fields = [list(item.keys())[0] for item in self._theader]
self._widths = [0] * len(self._captions)
for i, caption in enumerate(self._captions):
self._widths[i] = len(caption)
for row in self._data:
for i, field in enumerate(self._fields):
if len(str(row[field])) > self._widths[i]:
self._widths[i] = len(str(row[field]))
def _get_line(self):
"""Draw a horizontal line with a correlating width."""
if not PLAIN_OUTPUT:
return '-' * (sum(self._widths) + len(self._widths) * COLUMN_PADDING) + '\n'
return ''
def _get_theader(self):
"""Draw the table header."""
if not PLAIN_OUTPUT:
theader = ''
for i, caption in enumerate(self._captions):
theader += ' {0:<{1}}'.format(caption, self._widths[i] + COLUMN_PADDING - 1)
return theader + '\n'
return ''
def output(self):
"""Generate output."""
result = self._get_line()
result += self._get_theader()
result += self._get_line()
for item in self._data:
row = ''
for i, field in enumerate(self._fields):
value = format_value(item[field])
if PLAIN_OUTPUT:
row += '{0}{1}'.format(value, IFS)
else:
row += ' {0:<{1}}'.format(value, self._widths[i] + COLUMN_PADDING - 1)
result += row + '\n'
result += self._get_line()
result = result.rstrip('\n')
return result
def process(self):
"""Execute API result."""
orthos.Terminal.show(self.output())
class Info(APIResponse):
def __init__(self):
self._order = self._header['order']
def format_line(self, item, data=None):
result = ''
if isinstance(data, dict):
value = format_value(data[item]['value'])
if isinstance(value, str):
value = value.replace('\n', ' ')
result = "{0:<{1}}: {2}\n".format(data[item]['label'], LEFT_MARGIN, value)
elif isinstance(data, list):
i = 0
for element in data:
j = 0
for subitem in item:
value = format_value(element[subitem]['value'])
if j > 0:
result += " " * 3 + "{0:<{1}}: {2}\n".format(
element[subitem]['label'],
LEFT_MARGIN - 3,
value
)
else:
result += "{0:<3}{1:<{2}}: {3}\n".format(
i, element[subitem]['label'],
LEFT_MARGIN - 3,
value
)
j += 1
i += 1
result += '\n'
return result
def output(self, order=None, data=None):
if order is None:
order = self._order
if data is None:
data = self._data
result = '-' * Terminal().width + '\n'
for item in order:
if item is None:
result += '-' * Terminal().width + '\n'
else:
if isinstance(item, list):
result += '{0:-^{1}}\n'.format(
' ' + data[item[0]]['label'] + ' ',
Terminal().width
)
result += self.format_line(item[1], data[item[0]]['value'])
else:
if item not in data.keys():
continue
result += self.format_line(item, data)
return result
def process(self):
"""Execute API result."""
orthos.Terminal.show(self.output())
class Terminal:
"""Access properties of the terminal."""
@property
def width(self):
s = struct.pack('HHHH', 0, 0, 0, 0)
try:
x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
except IOError:
return 80
return struct.unpack('HHHH', x)[1]
@property
def height(self):
s = struct.pack('HHHH', 0, 0, 0, 0)
try:
x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
except IOError:
return 25
return struct.unpack('HHHH', x)[0]
def pager(self, text):
pager = subprocess.Popen(
'less -S',
shell=True,
stdin=subprocess.PIPE
)
pager.stdin.write(bytes(text, 'utf-8'))
pager.stdin.close()
pager.wait()
def need_pager(self, lines, columns):
pager_policy = orthos.config.get_use_pager()
if pager_policy == 'always':
return True
elif pager_policy == 'never':
return False
elif pager_policy == 'vertical':
return self.height < lines
elif pager_policy == 'horizontal':
return self.width < columns
else:
return self.height < lines or self.width < columns
def show(self, text, lines=None, columns=None):
if not lines or not columns:
columns = 0
lines = 0
for line in text.splitlines():
lines += 1
columns = max(columns, len(line))
if self.need_pager(lines, columns):
self.pager(text)
else:
print(text)
# from https://pymotw.com/2/readline/
class TabCompleter:
def __init__(self, options):
self._current_options = []
self._options = options
def complete(self, text, state):
"""Return the completion."""
response = None
if state == 0:
line = readline.get_line_buffer()
begin = readline.get_begidx()
end = readline.get_endidx()
being_completed = line[begin:end]
words = line.split()
if words:
try:
if begin == 0:
options = self._options.keys()
else:
first = words[0]
options = self._options[first]
if being_completed:
self._current_options = [
word for word in options if word.startswith(being_completed)
]
else:
self._current_options = options
except (KeyError, IndexError):
self._current_options = []
else:
self._current_options = sorted(self._options.keys())
try:
response = self._current_options[state]
except IndexError:
response = None
return response
class OrthosLineReader:
def __init__(self, orthos, history=None):
self.orthos = orthos
self.prompt = ''
self.completion = True
try:
if history:
readline.read_history_file(history)
except IOError:
pass
if history:
atexit.register(readline.write_history_file, history)
readline.set_completer_delims(' ')
readline.set_completer(orthos.Tabcompleter.complete)
readline.parse_and_bind('tab: complete')
def disable_completion(self):
self.completion = False
def enable_completion(self):
self.completion = True
def hook(self):
readline.insert_text(self.default)
readline.redisplay()
def set_prompt(self, prompt):
self.prompt = prompt
def readline(self, prompt=None, default=None, history=True, complete=True):
old_completion = self.completion
if not complete:
self.disable_completion()
prompt_suffix = ''
if default:
try:
readline.set_pre_input_hook(self.hook)
except AttributeError:
prompt_suffix = ' [' + str(default) + ']'
self.default = default
if not prompt:
prompt = self.prompt
prompt += prompt_suffix
if prompt:
prompt += ' '
try:
result = input(prompt)
if not history and result:
if sys.stdin.isatty():
readline.remove_history_item(readline.get_current_history_length() - 1)
finally:
if default:
try:
readline.set_pre_input_hook(None)
except AttributeError:
pass
if not complete and old_completion:
self.enable_completion()
return result
class Config:
def __init__(self):
self.__cp = RawConfigParser()
self.__cp.read(SYSTEM_CONFIGFILE)
self.__cp.read(os.path.expanduser(USER_CONFIGFILE))
self.__aliases = {}
self.__protocol = None
self.__port = None
self.__server = None
self.__user = None
self.__auth_user = None
self.__password = None
self.__use_pager = None
self.__timezone = None
self.__token = None
self.__quiet = not sys.stdin.isatty()
for alias in ALIASES:
self.__aliases[alias] = ALIASES[alias]
if self.__cp.has_section('alias'):
for key in self.__cp.options('alias'):
self.__aliases[key] = self.__cp.get('alias', key)
def set_quiet(self, quiet):
self.__quiet = quiet
def is_quiet(self):
return self.__quiet
def set_password(self, password):
self.__password = password
def get_password(self):
if self.__password is not None:
return self.__password
if self.__cp.has_option('global', 'password'):
return self.__cp.get('global', 'password')
return None
def set_server(self, server):
self.__server = server
def get_server(self):
if self.__server:
return self.__server
else:
if self.__cp.has_option('global', 'server'):
return self.__cp.get('global', 'server')
else:
return DEFAULT_SERVERNAME
def set_port(self, port):
self.__port = port
def get_port(self):
if self.__port:
return self.__port
else:
if self.__cp.has_option('global', 'port'):
return self.__cp.getint('global', 'port')
else:
return DEFAULT_PORT
def set_protocol(self, protocol):
self.__protocol = protocol
def get_protocol(self):
if self.__protocol:
return self.__protocol
else:
if self.__cp.has_option('global', 'protocol'):
return self.__cp.get('global', 'protocol')
else:
return DEFAULT_PROTOCOL
def get_history(self):
if self.__cp.has_option('global', 'history'):
hist = self.__cp.get('global', 'history')
else:
hist = USER_HISTORYFILE
return os.path.expanduser(hist)
def set_user(self, user):
self.__user = user
def get_user(self):
if self.__user is not None:
return self.__user
if self.__cp.has_option('global', 'username'):
return self.__cp.get('global', 'username')
else:
return pwd.getpwuid(os.getuid())[0]
def set_auth_user(self, user):
self.__auth_user = user
def get_auth_user(self):
if not self.__auth_user:
return "Anonymous"
return self.__auth_user
def set_use_pager(self, use_pager):
values = [
'always',
'never',
'horizontal',
'vertical',
'both'
]
if use_pager not in values:
raise ValueError("use_pager must be in {0}".format(', '.join(values)))
self.__use_pager = use_pager
def get_use_pager(self):
if self.__use_pager is not None:
return self.__use_pager
if sys.stdin.isatty():
if self.__cp.has_option('global', 'use_pager'):
return self.__cp.get('global', 'use_pager')
else:
return 'both'
else:
return 'never'
def get_aliases(self):
return self.__aliases
def set_alias(self, name, *values):
value = ' '.join(values)
self.__aliases[name] = value
if not self.__cp.has_section('alias'):
self.__cp.add_section('alias')
for alias in self.__aliases:
self.__cp.set('alias', alias, self.__aliases[alias])
self.write_config()
def format_aliases(self, alias=None):
output = ''
if alias:
if alias in self.get_aliases().keys():
output = '{} => {}'.format(alias, self.get_aliases()[alias])
else:
output = "ERROR: Unknown alias '{}'!".format(alias)
else:
for alias in self.get_aliases().keys():
output += '{} => {}\n'.format(alias, self.get_aliases()[alias])
return output.rstrip('\n')
def set_timezone(self, timezone):
if timezone in pytz.all_timezones:
self.__timezone = timezone
else:
logging.warning("Unknown timezone: {}".format(timezone))
def get_timezone(self):
if self.__timezone:
return self.__timezone
else:
if self.__cp.has_option('global', 'timezone'):
timezone = self.__cp.get('global', 'timezone')
if timezone in pytz.all_timezones:
return timezone
else:
logging.warning("Unknown timezone: {}".format(timezone))
return TIME_ZONE
else:
return TIME_ZONE
def set_token(self, token):
self.__token = token
def get_token(self):
if self.__token:
return self.__token
else:
if self.__cp.has_option('global', 'token'):
return self.__cp.get('global', 'token')
return None
def write_config(self):
with open(os.path.expanduser(USER_CONFIGFILE), 'w') as f:
self.__cp.write(f)
def read_config(self, f: str):
self.__cp.read(f)
class Orthos:
class API:
def request_token(self, username, password):
"""Request token from API for user authentication."""
url = API_URL + '/login'
data = {
'username': username,
'password': password
}
header, data = self.request('POST', url, data, login=True)
orthos.config.set_token(data.get('token', None))
def get(self, url):
"""Send GET request to given URL."""
response = None
headers = {}
if self.get_token():
headers['Authorization'] = 'Token ' + self.get_token()
try:
request = urllib_request.Request(url, headers=headers)
response = urllib_request.urlopen(request)
except Exception as e:
logging.debug(e)
print("ERROR: {0} ({1})".format(str(e), API_URL))
sys.exit(1)
return response
def post(self, url, data=None, login=False):
"""Send POST request to given URL."""
response = None
if login:
json_data = json.dumps(data)
else:
json_data = json.dumps(data)
logging.debug(json_data)
headers = {'Content-Type': 'application/json'}
if self.get_token():
headers['Authorization'] = 'Token ' + self.get_token()
try:
request = urllib_request.Request(
url,
str(json_data).encode('utf-8'),
headers=headers
)
response = urllib_request.urlopen(request)
except Exception as e:
logging.debug(e)
print("\nERROR: Internal server Error, please double check submitted data and report:")
print("Bad Data:\n{0}\nURL:{1}\n".format(json_data, API_URL))
sys.exit(1)
return response
def request(self, method, url, data=None, login=False):
logging.debug(url)
if (not orthos.config.is_quiet()):
print(PROMPT_LOADING, end='\r')
if method == 'GET':
response = self.get(url)
else:
response = self.post(url, data, login)
if response:
result = json.loads(response.read().decode('utf-8'))
else:
return (None, {})
if login:
header = None
data = result
else:
header = result.get('header', None)
data = result.get('data', [])
if (not orthos.config.is_quiet()):
print(' ' * len(PROMPT_LOADING), end='\r')
return (header, data)
def get_token(self):
return orthos.config.get_token()
def __init__(self):
"""Initialize orthos object."""
self.API = self.API()
self.config = Config()
self.Terminal = Terminal()
def init(self):
"""Collect available commands from server and print welcome message."""
global API_URL
if orthos.config.get_protocol() != "https":
logging.warning("No secure ssl connection, try -H https://<host> or -P 443")
API_URL = API_URL_FORMAT.format(
orthos.config.get_protocol(),
orthos.config.get_server(),
orthos.config.get_port()
)
header, data = self.API.request('GET', API_URL)
orthos.config.set_auth_user(data.get('user'))
self._commands = [
Command(command, values) for command, values in data['commands'].items()
]
self._add_client_command(
'alias',
help='Define own aliases.',
docstring="""Define or display aliases. The command can be called without any
arguments, then it displays all available aliases. If it's called
with one argument, then it displays the definition of a specific
alias. If it is called with more than two arguments, then you
can define new aliases.
To execute an alias, type the alias name with a leading '@'.
Usage:
ALIAS [alias] [*args]
Arguments:
alias - Alias name.
*args - Valid command string.
Example:
ALIAS
ALIAS allmachines query name, ipv4 where name =~ foobar
ALIAS allmachines
@allmachines
"""
)
self._add_client_command(
'auth',
help='Request authorisation manually.'
)
self._add_client_command(
'exit',
help='Exit program.'
)
self._add_client_command(
'config',
help='Show connection/shell configurations.'
)
self._add_client_command(
'help',
[str(command) for command in self._commands],
help='Provides help.'
)
self.username = self.config.get_user()
if not self.username:
print("Empty username - Use -u/--user or check %s or %s" % (USER_CONFIGFILE, SYSTEM_CONFIGFILE))
exit(1)
self.recent_command = None
commands = {}
for command in self._commands:
commands[str(command)] = command.get_tabcompletion()
for alias in orthos.config.get_aliases():
commands['@' + alias] = []
self.Tabcompleter = TabCompleter(commands)
message = data.get('message', None)
if message and not orthos.config.is_quiet():
self.Terminal.show(message)
def _add_client_command(self, command, tabcompletion=[], help=None, docstring=None):
"""Add client command for tab completion."""
attributes = {'tabcompletion': tabcompletion}
if help:
attributes['help'] = help
if docstring:
attributes['docstring'] = docstring
self._commands.append(Command(command, attributes))
def get_next_user_command(self, prompt=None):
try:
input = ''
while not input:
input = self.LineReader.readline(prompt=prompt)
except KeyboardInterrupt:
pass
except EOFError:
if not orthos.config.is_quiet():
print()
return ['QUIT']
result = []
for token in re.split(''' (?=(?:[^'"]|'[^']*'|"[^"]*")*$)''', input):
result.append(token.replace(' ', '%20'))
# handle command line redirection
if '|' in result:
idx = result.index('|')
self.Terminal.set_output_filter(result[idx + 1:])
result = result[0:idx]
elif '|>' in result:
idx = result.index('|>')
self.Terminal.set_output_file(result[idx + 1], False)
result = result[0:idx]
elif '|>>' in result:
idx = result.index('|>>')
self.Terminal.set_output_file(result[idx + 1], True)
result = result[0:idx]
result = list(filter(lambda x: x != '', result))
return result
def get_password(self):
"""Prompt for password and return it."""
password = None
try:
password = getpass.getpass("Orthos password for {0}: ".format(self.username))
except (KeyboardInterrupt, EOFError):
pass
return password
def authenticate(self):
"""Authenticate the user."""
password = orthos.config.get_password()
if password is not None:
self.API.request_token(self.username, password)
if not self.API.get_token():
print("ERROR: Authorization error!")
if self.recent_command:
self.recent_command = None
return None
else:
password = self.get_password()
if not password:
print("\nAborted by user!")
if self.recent_command:
self.recent_command = None
return None
self.API.request_token(self.username, password)
if not self.API.get_token():
print("ERROR: Authorization error!")
return
self.config.set_auth_user(self.username)
self.LineReader.set_prompt(get_prompt(self.config.get_auth_user()))
def is_authenticated(self):
"""Check if the user is already authenticated (token set)."""
return True if self.API.get_token() else False
def print_help(self, arguments=None):
"""Print help."""
arguments = arguments.strip()
if arguments:
argument = arguments.split()[0]
if argument and argument in self._commands:
command = self._commands[self._commands.index(argument)]
print(command.get_docstring())
else:
print("ERROR: Unknown command!")
else:
print("Commands are:")
print()
for command in self._commands:
print("\t{0:<20} {1}".format(str(command).upper(), command.get_help()))
print()
def print_config(self):
print("")
print("User:\t\t%s" % self.config.get_auth_user())
print("Server:\t\t%s" % self.config.get_server())
print("Port:\t\t%s" % self.config.get_port())
print("Protocol:\t%s" % self.config.get_protocol())
print("Timezone:\t%s" % self.config.get_timezone())
print("")
def run(self):
self.LineReader = OrthosLineReader(
self,
history=self.config.get_history()
)
if sys.stdin.isatty():
self.LineReader.set_prompt(get_prompt(self.config.get_auth_user()))
while True:
try:
if self.recent_command:
if isinstance(self.recent_command, list):
input_ = self.recent_command
else:
input_ = self.recent_command.as_input()
self.recent_command = None
else:
input_ = self.get_next_user_command()
if '$USERNAME' in input_:
input_ = list(
map(lambda x: x.replace('$USERNAME', self.config.get_user()), input_)
)
raw_command = input_[0].lstrip('@')
command = raw_command.upper()
arguments = input_[1:]
raw_arguments = ' '.join(input_[1:])
arguments_length = len(input_[1:])
if command == 'AUTH':
self.authenticate()
continue
elif command == 'HELP':
self.print_help(raw_arguments)
continue
elif command in {'QUIT', 'EXIT'}:
return
elif command in {'CONFIG'}:
self.print_config()
continue
elif command == 'ALIAS':
if arguments_length == 0:
self.Terminal.show(orthos.config.format_aliases())
elif arguments_length == 1:
self.Terminal.show(orthos.config.format_aliases(arguments[0]))
else:
orthos.config.set_alias(arguments[0], *arguments[1:])
continue
elif command in self._commands:
command = self._commands[self._commands.index(command)]
if command.get_matching_argument_list(arguments_length) is None:
raise AttributeError(command.get_name())
response = command.send(self, raw_arguments)
if response:
apiresponse = APIResponse(*response)
self.recent_command = command
result = apiresponse.process()
if not isinstance(result, Command):
self.recent_command = None
continue
elif raw_command in orthos.config.get_aliases():
self.recent_command = orthos.config.get_aliases()[raw_command].split(' ')
self.recent_command += arguments
continue
elif command == '':
print()
else:
print("Invalid command.")
except AttributeError as e:
print("ERROR: Invalid number of arguments. "
"Type 'help {}' for more information.".format(e))
except KeyboardInterrupt:
pass
def main():
"""Main function."""
global orthos
global PLAIN_OUTPUT
global IFS
orthos = Orthos()
parser = argparse.ArgumentParser(description='Orthos command line interface.')
parser.add_argument(
'-H',
'--host',
dest='servername',
metavar='HOST',
help='use the hostname specified on the command line instead of the one in the config file'
)
parser.add_argument(
'-c',
'--config',
dest='config_file',
help='Additionally read this file as config file after reading %s and %s' % (SYSTEM_CONFIGFILE, USER_CONFIGFILE)
)
parser.add_argument(
'-P',
'--port',
dest='port',
metavar='PORT',
help='use the port specified on the command line instead of the one in the config file'
)
parser.add_argument(
'-U',
'--user',
dest='user',
metavar='USER',
help='use the username specified'
)
parser.add_argument(
'--password',
dest='password',
metavar='PASSWORD',
help='use this password for automatic authentication (e.g. for scripting)'
)
parser.add_argument(
'--token',
dest='token',
metavar='TOKEN',
help='use this token for automatic authentication (e.g. for scripting); '
'-U/--password options will be ignored'
)
parser.add_argument(
'-D',
'--debug',
dest='debug',
action='store_const',
const=True,
default=DEBUG,
help='write debugging output'
)
parser.add_argument(
'-L',
'--logfile',
dest='logfile',
default=False,
metavar='FILE',
help='use that together with -D to log the debug output in a file rather than the console'
)
parser.add_argument(
'--no-pager',
dest='no_pager',
const=True,
default=False,
action='store_const',
help='do not use pager when showing results'
)
parser.add_argument(
'-p',
'--plain-output',
dest='plain_output',
const=True,
default=PLAIN_OUTPUT,
action='store_const',
help='print plain output (e.g. for scripting)'
)
parser.add_argument(
'-F',
'--ifs',
dest='ifs',
default=IFS,
metavar='IFS',
help='set internal field separator (only useful in combination with -p; default is $OIFS)'
)
parser.add_argument(
'-q',
'--quiet',
dest='quiet',
const=True,
action='store_const',
help='makes command line client quiet'
)
parser.add_argument(
'-v',
'--version',
dest='version',
action='store_const',
const=True,
default=False,
help='print version output'
)
parser.add_argument(
'--timezone',
dest='timezone',
metavar='TZ',
help='set the local time zone (default is "Europe/Berlin")'
)
options = parser.parse_args()
if options.config_file:
orthos.config.read_config(options.config_file)
if options.debug:
if options.logfile:
logging.basicConfig(
filename=options.logfile,
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s'
)
else:
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s'
)
logging.info('Debugging enabled')
if options.servername:
servername = options.servername.split('://')
if len(servername) == 2:
orthos.config.set_protocol(servername[0].lower())
orthos.config.set_server(servername[1].lower())
if servername[0].lower() == 'https':
orthos.config.set_port(443)
elif servername[0].lower() == 'http':
orthos.config.set_port(80)
elif len(servername) == 1:
orthos.config.set_server(servername[0].lower())
if options.port:
port = int(options.port)
orthos.config.set_port(port)
if port == 443:
orthos.config.set_protocol("https")
else:
orthos.config.set_protocol("http")
if options.user:
orthos.config.set_user(options.user)
if options.password:
orthos.config.set_password(options.password)
if options.token:
orthos.config.set_token(options.token)
if options.no_pager:
orthos.config.set_use_pager('never')
if options.plain_output:
PLAIN_OUTPUT = True
if options.ifs:
IFS = options.ifs
if options.quiet:
orthos.config.set_quiet(True)
if options.version:
print(VERSION)
sys.exit(0)
if options.timezone:
orthos.config.set_timezone(options.timezone)
orthos.init()
try:
orthos.run()
except Exception as e:
print(e)
finally:
if orthos.config.is_quiet():
print()
else:
print("Good bye, have a lot of fun...")
if __name__ == '__main__':
main()
07070100000001000081A400000000000000000000000161D8322100000044000000000000000000000000000000000000002400000000orthos2-1.2.31+git.5a38e82/orthosrc[global]
username =
server =
port = 443
protocol = https
token =
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!93 blocks