From 874f62e0ab04cdb72814b60cd20ab4459fa9b4a5 Mon Sep 17 00:00:00 2001 From: Jonas Gunz Date: Wed, 13 Jan 2021 21:31:30 +0100 Subject: rename package to phpipam-api --- phpipam_api/__init__.py | 40 ++++++++++++++ phpipam_api/backend.py | 61 +++++++++++++++++++++ phpipam_api/resources.py | 134 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100755 phpipam_api/__init__.py create mode 100755 phpipam_api/backend.py create mode 100755 phpipam_api/resources.py (limited to 'phpipam_api') diff --git a/phpipam_api/__init__.py b/phpipam_api/__init__.py new file mode 100755 index 0000000..1dc065f --- /dev/null +++ b/phpipam_api/__init__.py @@ -0,0 +1,40 @@ +# +# phpipam/__init__.py +# (c) 2021 Jonas Gunz +# License: MIT +# +from .backend import PhpipamBackend +from .resources import PhpipamResource + +class PhpipamAPI: + """ + phpIPAM API Implementation + + Attributes + ---------- + sections + subnets + addresses + devices + + https://phpipam.net/api-documentation/ + """ + + def __init__(self, api_url, app_id, api_user, api_password): + """ + Parameters + ---------- + api_url : str + URL of phpIPAM instance. Example: https://phpipam.example.com/ + app_id : str + AppID configrued in API settings + api_user : str + username, leave empty to use static token authentification + api_password : str + password or static authentification token + """ + + self._backend = PhpipamBackend(api_url, app_id, api_user, api_password) + + def __getattr__(self, item): + return PhpipamResource(self._backend, item) diff --git a/phpipam_api/backend.py b/phpipam_api/backend.py new file mode 100755 index 0000000..85079df --- /dev/null +++ b/phpipam_api/backend.py @@ -0,0 +1,61 @@ +# +# phpipam/backend.py +# (c) 2021 Jonas Gunz +# License: MIT +# +import requests +import json +import datetime +from dateutil.parser import parse as datetime_parse + +class ApiConnectionException(Exception): + pass + +class ApiQueryException(Exception): + pass + +class ApiObjectNotFoundException(Exception): + pass + +class PhpipamBackend: + def __init__(self, api_url, app_id, api_user, api_password): + self.api_url = api_url.strip('/') + '/api/' + app_id + self.api_user = api_user + self.api_password = api_password + + # Check for static auth + if len(self.api_user) == 0: + self.api_token = self.api_password + self.api_token_expires = "" + else: + self._getApiToken() + + def _getApiToken(self): + data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json() + if not data['success']: + raise ApiConnectionException('Failed to authenticate: ' + str(data['code']) + ' ' + data['message']) + + self.api_token = data['data']['token'] + self.api_token_expires = data['data']['expires'] + + + def _isTokenExpired(self): + # static auth does not expire + if len(self.api_token_expires) == 0: + return False + + expiration = datetime_parse(self.api_token_expires) + + return expiration < datetime.datetime.now() + + def request ( self, method, url, data = {} ): + if self._isTokenExpired(): + self._getApiToken() + + data = requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json() + + if not 'success' in data or not data['success']: + raise ApiQueryException("Query failed with code " + str(data['code']) + ": " + str(data['message'])) + + return data['data'] + diff --git a/phpipam_api/resources.py b/phpipam_api/resources.py new file mode 100755 index 0000000..8f44c17 --- /dev/null +++ b/phpipam_api/resources.py @@ -0,0 +1,134 @@ +# +# phpipam/resources.py +# (c) 2021 Jonas Gunz +# License: MIT +# +from .backend import PhpipamBackend + +# Custom functions are defined here +resource_types = { + 'sections' : { + 'getSubnets':{ + 'method':'GET', + 'request':'/sections/{section_id}/subnets', + } + }, + 'subnets' : { + 'search':{ + 'method':'GET', + 'request':'/subnets/search/{search}' + }, + 'getIP':{ + 'method':'GET', + 'request':'/subnets/{subnet_id}/addresses/{ip}/' + }, + 'getAddresses':{ + 'method':'GET', + 'request':'/subnets/{subnet_id}/addresses/' + }, + }, + 'addresses' : { + 'getByIP':{ + 'method':'GET', + 'request':'/addresses/{ip}/{subnet_id}/' + }, + 'getByTag':{ + 'method':'GET', + 'request':'/addresses/tags/{tag_id}/addresses/' + }, + 'search':{ + 'method':'GET', + 'request':'/addresses/search/{ip}/' + }, + 'getFirstFree':{ + 'method':'GET', + 'request':'/addresses/first_free/{subnet_id}/' + }, + 'getTags':{ + 'method':'GET', + 'request':'/addresses/tags/' + }, + 'getTag':{ + 'method':'GET', + 'request':'/addresses/tags/{tag_id}/' + }, + 'createFirstFree':{ + 'method':'POST', + 'request':'/addresses/first_free/{subnet_id}/' + } + }, + 'vlan':{}, + 'l2domains':{}, + 'vrf':{}, + 'devices' : { + 'getAddresses':{ + 'method':'GET', + 'request':'/devices/{device_id}/addresses/' + }, + 'getSubnets':{ + 'method':'GET', + 'request':'/devices/{device_id}/subnets/' + } + }, + 'prefix':{}, +} + +class InvalidResourceException(Exception): + pass + +class InvalidResourceOperationException(Exception): + pass + +class InvalidResourceOperationArgumentException(Exception): + pass + +class PhpipamResourceFunction: + def __init__(self, backend, resource, function): + if not function in resource_types[resource]: + raise InvalidResourceOperationException(f'Operation {function} is not defined for {resource}.') + + self._backend = backend + self._resource = resource + self._function = resource_types[resource][function] + self._name = function + + def __call__(self, **kwargs): + if 'data' in kwargs: + data = kwargs['data'] + else: + data = {} + try: + return self._backend.request( self._function['method'], self._function['request'].format(**kwargs), data=data ) + except KeyError as e: + raise InvalidResourceOperationArgumentException( f'{self._resource}.{self._name}: Missing arguments: {e.args}' ) + + +class PhpipamResource: + def __init__(self, backend, resource): + if not resource in resource_types: + raise InvalidResourceException(f'Invalid resource "{resource}"') + + self._type = resource + self._backend = backend + + def __getattr__(self, attr): + return PhpipamResourceFunction(self._backend, self._type, attr) + + # Functions every ObjectType shares + + def get(self): + """List of all objects""" + return self._backend.request('GET', f'/{self._type}') + + def byID(self, object_id): + """object identified by object_id : str""" + return self._backend.request('GET', f'/{self._type}/{object_id}') + + def create(self, data): + return self._backend.request('POST', f'/{self._type}/{object_id}', data=data) + + def edit(self, data): + return self._backend.request('PATCH', f'/{self._type}/{object_id}', data=data) + + def delete(self, object_id): + return self._backend.request('DELETE', f'/{self._type}/{object_id}') -- cgit v1.2.3