From 308294fd65f5a830b37f7fe1829dc35dc6d309fe Mon Sep 17 00:00:00 2001 From: Jonas Gunz Date: Wed, 21 Jun 2023 22:05:20 +0200 Subject: python module restructure --- automato/endpoint.py | 114 ---------------------------------- automato/endpoint/__init__.py | 114 ++++++++++++++++++++++++++++++++++ automato/state.py | 130 --------------------------------------- automato/state/__init__.py | 130 +++++++++++++++++++++++++++++++++++++++ automato/transport.py | 122 ------------------------------------ automato/transport/__init__.py | 122 ++++++++++++++++++++++++++++++++++++ automato/trigger.py | 134 ---------------------------------------- automato/trigger/__init__.py | 136 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 502 insertions(+), 500 deletions(-) delete mode 100644 automato/endpoint.py create mode 100644 automato/endpoint/__init__.py delete mode 100644 automato/state.py create mode 100644 automato/state/__init__.py delete mode 100644 automato/transport.py create mode 100644 automato/transport/__init__.py delete mode 100644 automato/trigger.py create mode 100644 automato/trigger/__init__.py diff --git a/automato/endpoint.py b/automato/endpoint.py deleted file mode 100644 index c1abe4e..0000000 --- a/automato/endpoint.py +++ /dev/null @@ -1,114 +0,0 @@ -import logging -logger = logging.getLogger(__name__) - -from . import transport - -def import_class(cl): - d = cl.rfind(".") - classname = cl[d+1:len(cl)] - m = __import__(cl[0:d], globals(), locals(), [classname]) - return getattr(m, classname) - - # Master object -class Endpoint: - def __init__(self, name: str, config: dict): - transports = {} - commands = {} - states = {} - - endpoint_info = config.get('info', {}) - - # sweet mother of jesus, you are ugly - for tp_key in config['transports']: - tp_cfg = config['transports'][tp_key] - logger.debug(f'loading transport "{tp_key}"') - - # TODO Handle failure - tp_class = import_class(tp_cfg['class']) - del tp_cfg['class'] - - transports[tp_key] = tp_class(endpoint_info, **tp_cfg) - - for cmd_key in config['commands']: - cmd_cfg = config['commands'][cmd_key] - logger.debug(f'loading command "{cmd_key}"') - - # TODO Handle failure - cmd_class = import_class(cmd_cfg['class']) - del cmd_cfg['class'] - - if 'transport' in cmd_cfg: - if cmd_cfg['transport'] not in transports: - # TODO should we be lenient with errors? - logger.error(f'transport "{cmd_cfg["transport"]}" for command "{cmd_key}" was not found.') - continue - - tp = transports[cmd_cfg['transport']] - del cmd_cfg['transport'] - cmd_cfg['transport'] = tp - - commands[cmd_key] = cmd_class(endpoint_info, **cmd_cfg) - - # you look familiar - for stt_key in config['states']: - stt_cfg = config['states'][stt_key] - logger.debug(f'loading state "{stt_key}"') - - # TODO Handle failure - stt_class = import_class(stt_cfg['class']) - del stt_cfg['class'] - - if stt_cfg['transport'] not in transports: - # TODO should we be lenient with errors? - logger.error(f'transport "{stt_cfg["transport"]}" for command "{stt_key}" was not found.') - continue - - if 'transport' in stt_cfg: - tp = transports[stt_cfg['transport']] - del stt_cfg['transport'] - stt_cfg['transport'] = tp - - states[stt_key] = stt_class(endpoint_info, **stt_cfg) - - # TODO How does the init step look like? Do it here? - # transports prbly need to be connected here - - self._name = name - self._transports = transports - self._commands = commands - self._states = states - - def connectTransport(self): - for k in self._transports: - if self._transports[k].CONNECTION == transport.HOLD: - self._transports[k].connect() - elif self._transports[k].CONNECTION == transport.THROWAWAY: - self._transports[k].check() - else: - logger.error(f'"{self._transports[k].CONNECTION}" is an unknown connection type in transport "{k}"') - - # forces a recollect of all states. should not be needed, states should - # handle that themselves via TTL - # we shouldn't need it - #def collectState(self): - # # TODO we need a interface here - # for k in self._states: - # self._states[k].collect() - - # Format: . - def getState(self, state_key: str): - state, key = state_key.split('.', 1) - - if state not in self._states: - logger.error(f'State "{state}" was not found for "{self._name}"') - return None - - return self._states[state].get(key) - - - def executeCommand(self, cmd: str, **kwargs): - if cmd not in self._commands: - logger.error(f'Command "{cmd}" is not defined for "{self._name}"') - #raise Exception(f'Command "{cmd}" is not defined for "{self._name}"') - - self._commands[cmd].execute(**kwargs) diff --git a/automato/endpoint/__init__.py b/automato/endpoint/__init__.py new file mode 100644 index 0000000..770c8fa --- /dev/null +++ b/automato/endpoint/__init__.py @@ -0,0 +1,114 @@ +import logging +logger = logging.getLogger(__name__) + +from automato import transport + +def import_class(cl): + d = cl.rfind(".") + classname = cl[d+1:len(cl)] + m = __import__(cl[0:d], globals(), locals(), [classname]) + return getattr(m, classname) + + # Master object +class Endpoint: + def __init__(self, name: str, config: dict): + transports = {} + commands = {} + states = {} + + endpoint_info = config.get('info', {}) + + # sweet mother of jesus, you are ugly + for tp_key in config['transports']: + tp_cfg = config['transports'][tp_key] + logger.debug(f'loading transport "{tp_key}"') + + # TODO Handle failure + tp_class = import_class(tp_cfg['class']) + del tp_cfg['class'] + + transports[tp_key] = tp_class(endpoint_info, **tp_cfg) + + for cmd_key in config['commands']: + cmd_cfg = config['commands'][cmd_key] + logger.debug(f'loading command "{cmd_key}"') + + # TODO Handle failure + cmd_class = import_class(cmd_cfg['class']) + del cmd_cfg['class'] + + if 'transport' in cmd_cfg: + if cmd_cfg['transport'] not in transports: + # TODO should we be lenient with errors? + logger.error(f'transport "{cmd_cfg["transport"]}" for command "{cmd_key}" was not found.') + continue + + tp = transports[cmd_cfg['transport']] + del cmd_cfg['transport'] + cmd_cfg['transport'] = tp + + commands[cmd_key] = cmd_class(endpoint_info, **cmd_cfg) + + # you look familiar + for stt_key in config['states']: + stt_cfg = config['states'][stt_key] + logger.debug(f'loading state "{stt_key}"') + + # TODO Handle failure + stt_class = import_class(stt_cfg['class']) + del stt_cfg['class'] + + if stt_cfg['transport'] not in transports: + # TODO should we be lenient with errors? + logger.error(f'transport "{stt_cfg["transport"]}" for command "{stt_key}" was not found.') + continue + + if 'transport' in stt_cfg: + tp = transports[stt_cfg['transport']] + del stt_cfg['transport'] + stt_cfg['transport'] = tp + + states[stt_key] = stt_class(endpoint_info, **stt_cfg) + + # TODO How does the init step look like? Do it here? + # transports prbly need to be connected here + + self._name = name + self._transports = transports + self._commands = commands + self._states = states + + def connectTransport(self): + for k in self._transports: + if self._transports[k].CONNECTION == transport.HOLD: + self._transports[k].connect() + elif self._transports[k].CONNECTION == transport.THROWAWAY: + self._transports[k].check() + else: + logger.error(f'"{self._transports[k].CONNECTION}" is an unknown connection type in transport "{k}"') + + # forces a recollect of all states. should not be needed, states should + # handle that themselves via TTL + # we shouldn't need it + #def collectState(self): + # # TODO we need a interface here + # for k in self._states: + # self._states[k].collect() + + # Format: . + def getState(self, state_key: str): + state, key = state_key.split('.', 1) + + if state not in self._states: + logger.error(f'State "{state}" was not found for "{self._name}"') + return None + + return self._states[state].get(key) + + + def executeCommand(self, cmd: str, **kwargs): + if cmd not in self._commands: + logger.error(f'Command "{cmd}" is not defined for "{self._name}"') + #raise Exception(f'Command "{cmd}" is not defined for "{self._name}"') + + self._commands[cmd].execute(**kwargs) diff --git a/automato/state.py b/automato/state.py deleted file mode 100644 index 5d2789c..0000000 --- a/automato/state.py +++ /dev/null @@ -1,130 +0,0 @@ -import time -import logging -logger = logging.getLogger(__name__) - -from . import transport - -''' -Implementations of State: - -MUST implement: - _collect(self) - -CAN implement: - _get(self, key: str) - init(self, [transport], ) - -SHOULDNT implement: - get(self, key) - collect(self) - __init__(self, endpoint_info: dict, ttl: int = 30, **kwargs) - -Data is stored in self._data as a dictionary. -By default, _get(key) retrieves the returns self._data[key]. -This behaviour can be overridden by implementing a own _get(). - -If using the default _get(), _collect() has to store data in -the self._data dictionary. If an own _get() is implemented, -this does not need to be the case. -''' -class State: - # TODO set default TTL in child classes - def __init__(self, endpoint_info: dict, ttl: int = 30, **kwargs): - self._ttl = ttl - self._endpoint_info = endpoint_info - - self._data = {} - self._last_collected = 0 - - self._init(**kwargs) - - def _init(self): - pass - - def _collect(self): - raise NotImplemented - - def _get(self, key: str): - if key not in self._data: - logger.error(f'Data key {key} was not found.') - return None - - return self._data[key] - - def _shouldCollect(self): - return time.time() - self._last_collected > self._ttl - - def get(self, key: str): - if self._shouldCollect(): - logger.debug(f'Cached value for "{key}" is too old. refreshing.') - self.collect() - else: - logger.debug(f'Using cached value for "{key}".') - - - return self._get(key) - - # Force datacollection. not really needed - def collect(self): - self._collect() - self._last_collected = time.time() - -class UserSessionState(State): - def _init(self, transport: transport.SshTransport): - self._transport = transport - - def _get(self, key: str): - if key not in self._data: - return 0 - - return self._data[key] - - def _collect(self): - data = self._transport.execHandleStderror('who').decode('utf-8') - # TODO error handling - lines = data.split('\n') - - self._data = {} - - for l in lines: - name, _ = l.split(' ', 1) - - logger.debug(f'Found user session {name}') - - if name not in self._data: - self._data[name] = 0 - - self._data[name] += 1 - -class LinuxMemoryState(State): - def _init(self, transport: transport.SshTransport): - self._transport = transport - - def _collect(self): - mem_data = self._transport.execHandleStderror('cat /proc/meminfo').decode('utf-8') - - # TODO We prbly don't wan't raw values. Process them! - self._data['mem'] = {} - for l in mem_data.splitlines(): - arr = l.split() - key = arr[0].strip(':') - val = arr[1] - - self._data['mem'][key] = val - - logger.debug(f'Memory: {key} = {val}') - -class LinuxLoadState(State): - def _init(self, transport: transport.SshTransport): - self._transport = transport - - def _collect(self): - load_raw = self._transport.execHandleStderror('cat /proc/loadavg').decode('utf-8') - - data = load_raw.split(None,4) - - self._data = {} - - self._data['1'] = data[0] - self._data['5'] = data[1] - self._data['15'] = data[2] diff --git a/automato/state/__init__.py b/automato/state/__init__.py new file mode 100644 index 0000000..bbcb653 --- /dev/null +++ b/automato/state/__init__.py @@ -0,0 +1,130 @@ +import time +import logging +logger = logging.getLogger(__name__) + +from automato import transport + +''' +Implementations of State: + +MUST implement: + _collect(self) + +CAN implement: + _get(self, key: str) + init(self, [transport], ) + +SHOULDNT implement: + get(self, key) + collect(self) + __init__(self, endpoint_info: dict, ttl: int = 30, **kwargs) + +Data is stored in self._data as a dictionary. +By default, _get(key) retrieves the returns self._data[key]. +This behaviour can be overridden by implementing a own _get(). + +If using the default _get(), _collect() has to store data in +the self._data dictionary. If an own _get() is implemented, +this does not need to be the case. +''' +class State: + # TODO set default TTL in child classes + def __init__(self, endpoint_info: dict, ttl: int = 30, **kwargs): + self._ttl = ttl + self._endpoint_info = endpoint_info + + self._data = {} + self._last_collected = 0 + + self._init(**kwargs) + + def _init(self): + pass + + def _collect(self): + raise NotImplemented + + def _get(self, key: str): + if key not in self._data: + logger.error(f'Data key {key} was not found.') + return None + + return self._data[key] + + def _shouldCollect(self): + return time.time() - self._last_collected > self._ttl + + def get(self, key: str): + if self._shouldCollect(): + logger.debug(f'Cached value for "{key}" is too old. refreshing.') + self.collect() + else: + logger.debug(f'Using cached value for "{key}".') + + + return self._get(key) + + # Force datacollection. not really needed + def collect(self): + self._collect() + self._last_collected = time.time() + +class UserSessionState(State): + def _init(self, transport: transport.SshTransport): + self._transport = transport + + def _get(self, key: str): + if key not in self._data: + return 0 + + return self._data[key] + + def _collect(self): + data = self._transport.execHandleStderror('who').decode('utf-8') + # TODO error handling + lines = data.split('\n') + + self._data = {} + + for l in lines: + name, _ = l.split(' ', 1) + + logger.debug(f'Found user session {name}') + + if name not in self._data: + self._data[name] = 0 + + self._data[name] += 1 + +class LinuxMemoryState(State): + def _init(self, transport: transport.SshTransport): + self._transport = transport + + def _collect(self): + mem_data = self._transport.execHandleStderror('cat /proc/meminfo').decode('utf-8') + + # TODO We prbly don't wan't raw values. Process them! + self._data['mem'] = {} + for l in mem_data.splitlines(): + arr = l.split() + key = arr[0].strip(':') + val = arr[1] + + self._data['mem'][key] = val + + logger.debug(f'Memory: {key} = {val}') + +class LinuxLoadState(State): + def _init(self, transport: transport.SshTransport): + self._transport = transport + + def _collect(self): + load_raw = self._transport.execHandleStderror('cat /proc/loadavg').decode('utf-8') + + data = load_raw.split(None,4) + + self._data = {} + + self._data['1'] = data[0] + self._data['5'] = data[1] + self._data['15'] = data[2] diff --git a/automato/transport.py b/automato/transport.py deleted file mode 100644 index e5fd653..0000000 --- a/automato/transport.py +++ /dev/null @@ -1,122 +0,0 @@ -import paramiko -import logging - -from typing import Union - -logger = logging.getLogger(__name__) - -HOLD = 1 -THROWAWAY = 2 - -''' -Implementations of Transport: - -CAN set: - CONNECTION - - THROWAWAY signals that every invocation creates a new connection and - and thus connection management is not needed - - HOLD indicates a connection is established and held for multiple commands, - requiring initial connection and final disconnection - -MUST implement: - connect(self), disconnect(self) - when CONNECTION is set to HOLD - make sure to set self._connected accordingly. - check(self) - when CONNECTION is set to THROWAWAY - - Functions to use the Transport - it is advisable to specify the used Transport as a type hint in - the command/state using it to trigger errors on startup, - rather than at runtime in case of misconfiguration. - -CAN implement: - _init(self, ) - isConnected(self) -> bool - -SHOULDNT implement: - __init__(self, endpoint_info: dict, **kwargs) -''' -class Transport: - CONNECTION = HOLD - #CONNECTION = THROWAWAY - - def __init__(self, endpoint_info: dict, **kwargs): - self._endpoint_info = endpoint_info - self._connected = False - self._init(**kwargs) - - def _init(self): - pass - - # Connects to the transport, if CONNECTION == HOLD - def connect(self): - raise NotImplemented - - # disconnects to the transport, if CONNECTION == HOLD - def disconnect(self): - raise NotImplemented - - # validate that the transport works, if CONNECTION == THROWAWAY - def check(self): - raise NotImplemented - - def isConnected(self) -> bool: - return self._connected - - -class SshTransport(Transport): - CONNECTION=HOLD - - def _init(self, hostname: str, port=22, username='root', password = None, id_file = None, allow_agent = False): - self._hostname = hostname - self._port = port - self._username = username - self._password = password - self._id_file = id_file - self._allow_agent = allow_agent - - self._client = None - - def connect(self): - self._client = paramiko.SSHClient() - - # TODO known hosts - self._client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy) - try: - self._client.connect(self._hostname, port=self._port, username=self._username, password=self._password, key_filename=self._id_file, allow_agent=self._allow_agent) - self._connected = True - except paramiko.ssh_exception.NoValidConnectionsError as e: - logger.error(f'Failed to connect to {self._hostname}: {e.errors} ') - - - # return(str: stdout, str: stderr, int: retcode) - def exec(self, command: str): - if not self._connected: - # TODO we want a bit smarted connection logic - logger.error('SSH not connected') - raise Exception('Not connected') - - output = self._client.exec_command(command, timeout=5) - - retcode = output[1].channel.recv_exit_status() - return (output[1].read().strip(), output[2].read().strip(), retcode) - - def execHandleStderror(self, command: str): - out = self.exec(command) - - if out[2] != 0: - logger.error(f'Command returned error {out[2]}: {out[1]}') - raise Exception(f'Command returned error {out[2]}: {out[1]}') - - return out[0] - - def readFile(self, path: str): - return self.execHandleStderror(f'cat "{path}"') - - def disconnect(self): - if self._connected: - self._client.close() - - self._connected = False - self._client = None diff --git a/automato/transport/__init__.py b/automato/transport/__init__.py new file mode 100644 index 0000000..e5fd653 --- /dev/null +++ b/automato/transport/__init__.py @@ -0,0 +1,122 @@ +import paramiko +import logging + +from typing import Union + +logger = logging.getLogger(__name__) + +HOLD = 1 +THROWAWAY = 2 + +''' +Implementations of Transport: + +CAN set: + CONNECTION + - THROWAWAY signals that every invocation creates a new connection and + and thus connection management is not needed + - HOLD indicates a connection is established and held for multiple commands, + requiring initial connection and final disconnection + +MUST implement: + connect(self), disconnect(self) + when CONNECTION is set to HOLD + make sure to set self._connected accordingly. + check(self) + when CONNECTION is set to THROWAWAY + + Functions to use the Transport + it is advisable to specify the used Transport as a type hint in + the command/state using it to trigger errors on startup, + rather than at runtime in case of misconfiguration. + +CAN implement: + _init(self, ) + isConnected(self) -> bool + +SHOULDNT implement: + __init__(self, endpoint_info: dict, **kwargs) +''' +class Transport: + CONNECTION = HOLD + #CONNECTION = THROWAWAY + + def __init__(self, endpoint_info: dict, **kwargs): + self._endpoint_info = endpoint_info + self._connected = False + self._init(**kwargs) + + def _init(self): + pass + + # Connects to the transport, if CONNECTION == HOLD + def connect(self): + raise NotImplemented + + # disconnects to the transport, if CONNECTION == HOLD + def disconnect(self): + raise NotImplemented + + # validate that the transport works, if CONNECTION == THROWAWAY + def check(self): + raise NotImplemented + + def isConnected(self) -> bool: + return self._connected + + +class SshTransport(Transport): + CONNECTION=HOLD + + def _init(self, hostname: str, port=22, username='root', password = None, id_file = None, allow_agent = False): + self._hostname = hostname + self._port = port + self._username = username + self._password = password + self._id_file = id_file + self._allow_agent = allow_agent + + self._client = None + + def connect(self): + self._client = paramiko.SSHClient() + + # TODO known hosts + self._client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy) + try: + self._client.connect(self._hostname, port=self._port, username=self._username, password=self._password, key_filename=self._id_file, allow_agent=self._allow_agent) + self._connected = True + except paramiko.ssh_exception.NoValidConnectionsError as e: + logger.error(f'Failed to connect to {self._hostname}: {e.errors} ') + + + # return(str: stdout, str: stderr, int: retcode) + def exec(self, command: str): + if not self._connected: + # TODO we want a bit smarted connection logic + logger.error('SSH not connected') + raise Exception('Not connected') + + output = self._client.exec_command(command, timeout=5) + + retcode = output[1].channel.recv_exit_status() + return (output[1].read().strip(), output[2].read().strip(), retcode) + + def execHandleStderror(self, command: str): + out = self.exec(command) + + if out[2] != 0: + logger.error(f'Command returned error {out[2]}: {out[1]}') + raise Exception(f'Command returned error {out[2]}: {out[1]}') + + return out[0] + + def readFile(self, path: str): + return self.execHandleStderror(f'cat "{path}"') + + def disconnect(self): + if self._connected: + self._client.close() + + self._connected = False + self._client = None diff --git a/automato/trigger.py b/automato/trigger.py deleted file mode 100644 index 7acb100..0000000 --- a/automato/trigger.py +++ /dev/null @@ -1,134 +0,0 @@ -from typing import Dict -from pyparsing import alphanums, alphas, printables, pyparsing_common, pyparsing_common, Word, infix_notation, CaselessKeyword, opAssoc, ParserElement -import time -import logging -logger = logging.getLogger(__name__) - -from . import endpoint -from . import misc - -''' -Implementations of Trigger: - -MUST implement: - _evaluate(self, action: str) -> bool - evaluates the instace for action given by 'action'. - Provided configuration is stored in self._instances[action]['args'] - -CAN implement: - _addInstance(self, action: str) - Called afer 'action' was added. - -SHOULDNT implement: - evaluate(self, action: str) -> bool - Only calls _evaluate(), if no check was performed in configured interval, - otherwise returns cached result - addInstance(self, action:str, interval=30, **kwargs) -''' -class Trigger: - NEEDS_CONTEXT = False - - @staticmethod - def create(classname: str, **kwargs): - return misc.import_class(classname)(**kwargs) - - def __init__(self): - self._instances = {} - - def _addInstance(self, action: str): - pass - - def addInstance(self, action: str, interval: int=30, **kwargs): - self._instances[action] = {'lastupdate':0,'interval':interval,'last':False,'args':kwargs} - self._addInstance(action) - logger.debug(f'Trigger: Action "{action}" registered.') - - def _evaluate(self, action: str) -> bool: - raise NotImplemented - - def _shouldReevaluate(self, action: str) -> bool: - return time.time() - self._instances[action]['lastupdate'] > self._instances[action]['interval'] - - def evaluate(self, action: str) -> bool: - if action not in self._instances: - logger.error(f'Trigger: Action "{action}" was not found. Evaluating to False.') - return False - - if self._shouldReevaluate(action): - logger.debug(f'Re-evaluating trigger condition for action "{action}"') - result = self._evaluate(action) - - self._instances[action]['last'] = result - self._instances[action]['lastupdate'] = time.time() - return result - - return self._instances[action]['last'] - -''' -```yaml -conditional: - class: trigger.Conditional ---- -- conditional: - interval: 30 - when: - - host1.user.bob > 0 -``` -''' -class ConditionalTrigger(Trigger): - NEEDS_CONTEXT = True - - def __init__(self, endpoints: Dict[str, endpoint.Endpoint]): - super().__init__() - - self._endpoints = endpoints - self._setup_parser() - - def _setup_parser(self): - ParserElement.enable_packrat() - - boolean = CaselessKeyword('True').setParseAction(lambda x: True) | CaselessKeyword('False').setParseAction(lambda x: False) - integer = pyparsing_common.integer - variable = Word(alphanums + '.').setParseAction(self._parseVariable) - operand = boolean | integer | variable - - self._parser = infix_notation( - operand, - [ - ('not', 1, opAssoc.RIGHT, lambda a: not a[0][1]), - ('and', 2, opAssoc.LEFT, lambda a: a[0][0] and a[0][2]), - ('or', 2, opAssoc.LEFT, lambda a: a[0][0] or a[0][2]), - ('==', 2, opAssoc.LEFT, lambda a: a[0][0] == a[0][2]), - ('>', 2, opAssoc.LEFT, lambda a: a[0][0] > a[0][2]), - ('>=', 2, opAssoc.LEFT, lambda a: a[0][0] >= a[0][2]), - ('<', 2, opAssoc.LEFT, lambda a: a[0][0] < a[0][2]), - ('<=', 2, opAssoc.LEFT, lambda a: a[0][0] <= a[0][2]), - ('+', 2, opAssoc.LEFT, lambda a: a[0][0] + a[0][2]), - ('-', 2, opAssoc.LEFT, lambda a: a[0][0] - a[0][2]), - ('*', 2, opAssoc.LEFT, lambda a: a[0][0] * a[0][2]), - ('/', 2, opAssoc.LEFT, lambda a: a[0][0] / a[0][2]), - ] - ) - - def _parseVariable(self, var): - logger.debug(f'Looking up variable "{var[0]}"') - endpoint, key = var[0].split('.',1) - - if not endpoint in self._endpoints: - logger.error(f'Parser: Endpoint "{endpoint}" not found') - return None - - return self._endpoints[endpoint].getState(key) - - def _evaluate(self, action: str) -> bool: - logger.debug(f"{self._instances[action]['args']['when']}") - - results = [] - - for s in self._instances[action]['args']['when']: - r = self._parser.parse_string(str(s))[0] - logger.debug(f'Condition "{s}" evaluated to "{r}"') - results.append(r) - - return all(results) - diff --git a/automato/trigger/__init__.py b/automato/trigger/__init__.py new file mode 100644 index 0000000..a4b3213 --- /dev/null +++ b/automato/trigger/__init__.py @@ -0,0 +1,136 @@ +from typing import Dict +from pyparsing import alphanums, alphas, printables, pyparsing_common, pyparsing_common, Word, infix_notation, CaselessKeyword, opAssoc, ParserElement +import time +import logging +logger = logging.getLogger(__name__) + +from automato import endpoint +from automato import misc + + + +''' +Implementations of Trigger: + +MUST implement: + _evaluate(self, action: str) -> bool + evaluates the instace for action given by 'action'. + Provided configuration is stored in self._instances[action]['args'] + +CAN implement: + _addInstance(self, action: str) + Called afer 'action' was added. + +SHOULDNT implement: + evaluate(self, action: str) -> bool + Only calls _evaluate(), if no check was performed in configured interval, + otherwise returns cached result + addInstance(self, action:str, interval=30, **kwargs) +''' +class Trigger: + NEEDS_CONTEXT = False + + @staticmethod + def create(classname: str, **kwargs): + return misc.import_class(classname)(**kwargs) + + def __init__(self): + self._instances = {} + + def _addInstance(self, action: str): + pass + + def addInstance(self, action: str, interval: int=30, **kwargs): + self._instances[action] = {'lastupdate':0,'interval':interval,'last':False,'args':kwargs} + self._addInstance(action) + logger.debug(f'Trigger: Action "{action}" registered.') + + def _evaluate(self, action: str) -> bool: + raise NotImplemented + + def _shouldReevaluate(self, action: str) -> bool: + return time.time() - self._instances[action]['lastupdate'] > self._instances[action]['interval'] + + def evaluate(self, action: str) -> bool: + if action not in self._instances: + logger.error(f'Trigger: Action "{action}" was not found. Evaluating to False.') + return False + + if self._shouldReevaluate(action): + logger.debug(f'Re-evaluating trigger condition for action "{action}"') + result = self._evaluate(action) + + self._instances[action]['last'] = result + self._instances[action]['lastupdate'] = time.time() + return result + + return self._instances[action]['last'] + +''' +```yaml +conditional: + class: trigger.Conditional +--- +- conditional: + interval: 30 + when: + - host1.user.bob > 0 +``` +''' +class ConditionalTrigger(Trigger): + NEEDS_CONTEXT = True + + def __init__(self, endpoints: Dict[str, endpoint.Endpoint]): + super().__init__() + + self._endpoints = endpoints + self._setup_parser() + + def _setup_parser(self): + ParserElement.enable_packrat() + + boolean = CaselessKeyword('True').setParseAction(lambda x: True) | CaselessKeyword('False').setParseAction(lambda x: False) + integer = pyparsing_common.integer + variable = Word(alphanums + '.').setParseAction(self._parseVariable) + operand = boolean | integer | variable + + self._parser = infix_notation( + operand, + [ + ('not', 1, opAssoc.RIGHT, lambda a: not a[0][1]), + ('and', 2, opAssoc.LEFT, lambda a: a[0][0] and a[0][2]), + ('or', 2, opAssoc.LEFT, lambda a: a[0][0] or a[0][2]), + ('==', 2, opAssoc.LEFT, lambda a: a[0][0] == a[0][2]), + ('>', 2, opAssoc.LEFT, lambda a: a[0][0] > a[0][2]), + ('>=', 2, opAssoc.LEFT, lambda a: a[0][0] >= a[0][2]), + ('<', 2, opAssoc.LEFT, lambda a: a[0][0] < a[0][2]), + ('<=', 2, opAssoc.LEFT, lambda a: a[0][0] <= a[0][2]), + ('+', 2, opAssoc.LEFT, lambda a: a[0][0] + a[0][2]), + ('-', 2, opAssoc.LEFT, lambda a: a[0][0] - a[0][2]), + ('*', 2, opAssoc.LEFT, lambda a: a[0][0] * a[0][2]), + ('/', 2, opAssoc.LEFT, lambda a: a[0][0] / a[0][2]), + ] + ) + + def _parseVariable(self, var): + logger.debug(f'Looking up variable "{var[0]}"') + endpoint, key = var[0].split('.',1) + + if not endpoint in self._endpoints: + logger.error(f'Parser: Endpoint "{endpoint}" not found') + return None + + return self._endpoints[endpoint].getState(key) + + def _evaluate(self, action: str) -> bool: + logger.debug(f"{self._instances[action]['args']['when']}") + + results = [] + + for s in self._instances[action]['args']['when']: + r = self._parser.parse_string(str(s))[0] + logger.debug(f'Condition "{s}" evaluated to "{r}"') + results.append(r) + + return all(results) + -- cgit v1.2.3