From 17bd5b60a6d1445d208c1486c5b937c7f7881154 Mon Sep 17 00:00:00 2001 From: Jonas Gunz Date: Fri, 11 Dec 2020 00:23:25 +0100 Subject: rework --- phpipam/__init__.py | 156 ++------------------------------------------------- phpipam/backend.py | 76 +++++++++++++++++++++++++ phpipam/resources.py | 66 ++++++++++++++++++++++ 3 files changed, 147 insertions(+), 151 deletions(-) create mode 100755 phpipam/backend.py create mode 100755 phpipam/resources.py (limited to 'phpipam') diff --git a/phpipam/__init__.py b/phpipam/__init__.py index 08a2466..89d8280 100755 --- a/phpipam/__init__.py +++ b/phpipam/__init__.py @@ -1,17 +1,5 @@ - -import requests -import json -import datetime -from dateutil.parser import parse as datetime_parse - -class apiConnectionException(Exception): - pass - -class apiQueryException(Exception): - pass - -class apiObjectNotFoundException(Exception): - pass +from .backend import phpipamBackend +from .resources import phpipamResource class phpipam: """ @@ -22,141 +10,7 @@ class phpipam: """ def __init__(self, api_url, app_id, api_user, api_password): - """ - Parameters - ---------- - api_url : str - URL of the phpIPAM instance. Example: https://phpipam.example.com/ - app_id : str - AppID set in phpIPAM API settings - api_user : str - username, leave blank to use static token-authentication - api_password : str - user password or static auth token - - Raises - ------ - apiConnectionException - if the connection/authentification fails - """ - - self.api_url = api_url.strip('/') + '/api/' + app_id - self.api_user = api_user - self.api_password = api_password - - # Check for static auth - if len(self.api_user) == 0: - self.api_token = self.api_password - self.api_token_expires = "" - else: - self._getApiToken() - - def _req( self, method, url, data = {} ): - if self._isTokenExpired(): - self._getApiToken() - - return requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json() - - def _getApiToken(self): - data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json() - if not data['success']: - raise apiConnectionException('Failed to authenticate: ' + str(data['code'])) - - self.api_token = data['data']['token'] - self.api_token_expires = data['data']['expires'] - - - def _isTokenExpired(self): - # static auth does not expire - if len(self.api_token_expires) == 0: - return False - - expiration = datetime_parse(self.api_token_expires) - - return expiration < datetime.datetime.now() - - # TODO remove - def _checkTokenExpired(self): - data = self._req('GET', '/user/') - return data['success'] - - def _checkReq ( self, method, url, data = {} ): - """Wrapper for _req for checking result and only returning data""" - - data = self._req( method, url, data ) - - if not 'success' in data or not data['success']: - raise apiQueryException("Query failed with code " + str(data['code'] + ": " + str(['message']))) - - return data['data'] - - # - def getSections(self): - """ - Get the complete list of dictionaries describing sections - Returns: list of dicts - """ - - return self._checkReq('GET', '/sections/') - - def getSectionById(self, section_id): - """ - Get a section by id - - Parameters - ---------- - section_id : str - - Returns: dict - """ - - return self._checkReq('GET', f'/sections/{section_id}') - - - def getSectionByName(self, name): - """ - Find a section by name - - Parameters - ---------- - name : str - name of the section to search for - - Returns: dict - - Raises - ------ - apiObjectNotFoundException - if no section matches name - """ - - data = self.getSections() - - for section in data: - if 'name' in section and section['name'] == name: - return section - - raise apiObjectNotFoundException(f"Section {name} was not found.") - - def getSubnets(self, section_id): - """ - Get the complete list of dictionaries describing subnets of a section - - Parameters - ---------- - section_id : str - """ - - return self._checkReq('GET', f'/sections/{section_id}/subnets') - - def getSubnetById(self, subnet_id): - """ - Get a section by id - - Parameters - ---------- - subnet_id : str - """ - - return self._checkReq('GET', f'/subnets/{subnet_id}') + self._backend = phpipamBackend(api_url, app_id, api_user, api_password) + def __getattr__(self, item): + return phpipamResource(self._backend, item) diff --git a/phpipam/backend.py b/phpipam/backend.py new file mode 100755 index 0000000..81af316 --- /dev/null +++ b/phpipam/backend.py @@ -0,0 +1,76 @@ +import requests +import json +import datetime +from dateutil.parser import parse as datetime_parse + +class apiConnectionException(Exception): + pass + +class apiQueryException(Exception): + pass + +class apiObjectNotFoundException(Exception): + pass + +class phpipamBackend: + def __init__(self, api_url, app_id, api_user, api_password): + """ + Parameters + ---------- + api_url : str + URL of the phpIPAM instance. Example: https://phpipam.example.com/ + app_id : str + AppID set in phpIPAM API settings + api_user : str + username, leave blank to use static token-authentication + api_password : str + user password or static auth token + + Raises + ------ + apiConnectionException + if the connection/authentification fails + """ + + self.api_url = api_url.strip('/') + '/api/' + app_id + self.api_user = api_user + self.api_password = api_password + + # Check for static auth + if len(self.api_user) == 0: + self.api_token = self.api_password + self.api_token_expires = "" + else: + self._getApiToken() + + def _getApiToken(self): + data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json() + if not data['success']: + raise apiConnectionException('Failed to authenticate: ' + str(data['code'])) + + self.api_token = data['data']['token'] + self.api_token_expires = data['data']['expires'] + + + def _isTokenExpired(self): + # static auth does not expire + if len(self.api_token_expires) == 0: + return False + + expiration = datetime_parse(self.api_token_expires) + + return expiration < datetime.datetime.now() + + def request ( self, method, url, data = {} ): + """Wrapper for _req for checking result and only returning data""" + + if self._isTokenExpired(): + self._getApiToken() + + data = requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json() + + if not 'success' in data or not data['success']: + raise apiQueryException("Query failed with code " + str(data['code']) + ": " + str(data['message'])) + + return data['data'] + diff --git a/phpipam/resources.py b/phpipam/resources.py new file mode 100755 index 0000000..597560a --- /dev/null +++ b/phpipam/resources.py @@ -0,0 +1,66 @@ +from .backend import phpipamBackend + +resource_types = { + 'sections' : { + 'getSubnets':{ + 'method':'GET', + 'request':'/sections/{object_id}/subnets', + } + }, + 'subnets' : { + 'search':{ + 'method':'GET', + 'request':'/subnets/search/{search}' + }, + 'getIP':{ + 'method':'GET', + 'requests':'/subnets/' + } + }, + 'addresses' : { + }, + 'devices' : { + }, +} + +class invalidResourceException(Exception): + pass + +class invalidResourceOperationException(Exception): + pass + +class phpipamResourceFunction: + def __init__(self, backend, resource, function): + if not function in resource_types[resource]: + raise invalidResourceOperationException(f'Operation {function} is not defined for {resource}.') + + self._backend = backend + self._resource = resource + self._function = resource_types[resource][function] + + def __call__(self, **kwargs): + if 'data' in kwargs: + data = kwargs['data'] + else: + data = {} + + return self._backend.request( self._function['method'], self._function['request'].format(**kwargs), data=data ) + +class phpipamResource: + def __init__(self, backend, resource): + if not resource in resource_types: + raise invalidResourceException(f'Invalid resource "{resource}"') + + self._type = resource + self._backend = backend + + def __getattr__(self, attr): + return phpipamResourceFunction(self._backend, self._type, attr) + + def get(self): + """List of all objects""" + return self._backend.request('GET', f'/{self._type}') + + def byID(self, object_id): + """object identified by object_id : str""" + return self._backend.request('GET', f'/{self._type}/{object_id}') -- cgit v1.2.3