aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jonas Gunz <himself@jonasgunz.de> 2020-12-11 00:23:25 +0100
committerGravatar Jonas Gunz <himself@jonasgunz.de> 2020-12-11 00:23:25 +0100
commit17bd5b60a6d1445d208c1486c5b937c7f7881154 (patch)
tree691381c4e455e1a8a9599d0da8540b7f7b81b6d2
parentc22aedfb52c291ab791c3dfe6209c69047157cac (diff)
downloadpython-phpipam-17bd5b60a6d1445d208c1486c5b937c7f7881154.tar.gz
rework
-rwxr-xr-xphpipam/__init__.py156
-rwxr-xr-xphpipam/backend.py76
-rwxr-xr-xphpipam/resources.py66
3 files changed, 147 insertions, 151 deletions
diff --git a/phpipam/__init__.py b/phpipam/__init__.py
index 08a2466..89d8280 100755
--- a/phpipam/__init__.py
+++ b/phpipam/__init__.py
@@ -1,17 +1,5 @@
-
-import requests
-import json
-import datetime
-from dateutil.parser import parse as datetime_parse
-
-class apiConnectionException(Exception):
- pass
-
-class apiQueryException(Exception):
- pass
-
-class apiObjectNotFoundException(Exception):
- pass
+from .backend import phpipamBackend
+from .resources import phpipamResource
class phpipam:
"""
@@ -22,141 +10,7 @@ class phpipam:
"""
def __init__(self, api_url, app_id, api_user, api_password):
- """
- Parameters
- ----------
- api_url : str
- URL of the phpIPAM instance. Example: https://phpipam.example.com/
- app_id : str
- AppID set in phpIPAM API settings
- api_user : str
- username, leave blank to use static token-authentication
- api_password : str
- user password or static auth token
-
- Raises
- ------
- apiConnectionException
- if the connection/authentification fails
- """
-
- self.api_url = api_url.strip('/') + '/api/' + app_id
- self.api_user = api_user
- self.api_password = api_password
-
- # Check for static auth
- if len(self.api_user) == 0:
- self.api_token = self.api_password
- self.api_token_expires = ""
- else:
- self._getApiToken()
-
- def _req( self, method, url, data = {} ):
- if self._isTokenExpired():
- self._getApiToken()
-
- return requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json()
-
- def _getApiToken(self):
- data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json()
- if not data['success']:
- raise apiConnectionException('Failed to authenticate: ' + str(data['code']))
-
- self.api_token = data['data']['token']
- self.api_token_expires = data['data']['expires']
-
-
- def _isTokenExpired(self):
- # static auth does not expire
- if len(self.api_token_expires) == 0:
- return False
-
- expiration = datetime_parse(self.api_token_expires)
-
- return expiration < datetime.datetime.now()
-
- # TODO remove
- def _checkTokenExpired(self):
- data = self._req('GET', '/user/')
- return data['success']
-
- def _checkReq ( self, method, url, data = {} ):
- """Wrapper for _req for checking result and only returning data"""
-
- data = self._req( method, url, data )
-
- if not 'success' in data or not data['success']:
- raise apiQueryException("Query failed with code " + str(data['code'] + ": " + str(['message'])))
-
- return data['data']
-
- #
- def getSections(self):
- """
- Get the complete list of dictionaries describing sections
- Returns: list of dicts
- """
-
- return self._checkReq('GET', '/sections/')
-
- def getSectionById(self, section_id):
- """
- Get a section by id
-
- Parameters
- ----------
- section_id : str
-
- Returns: dict
- """
-
- return self._checkReq('GET', f'/sections/{section_id}')
-
-
- def getSectionByName(self, name):
- """
- Find a section by name
-
- Parameters
- ----------
- name : str
- name of the section to search for
-
- Returns: dict
-
- Raises
- ------
- apiObjectNotFoundException
- if no section matches name
- """
-
- data = self.getSections()
-
- for section in data:
- if 'name' in section and section['name'] == name:
- return section
-
- raise apiObjectNotFoundException(f"Section {name} was not found.")
-
- def getSubnets(self, section_id):
- """
- Get the complete list of dictionaries describing subnets of a section
-
- Parameters
- ----------
- section_id : str
- """
-
- return self._checkReq('GET', f'/sections/{section_id}/subnets')
-
- def getSubnetById(self, subnet_id):
- """
- Get a section by id
-
- Parameters
- ----------
- subnet_id : str
- """
-
- return self._checkReq('GET', f'/subnets/{subnet_id}')
+ self._backend = phpipamBackend(api_url, app_id, api_user, api_password)
+ def __getattr__(self, item):
+ return phpipamResource(self._backend, item)
diff --git a/phpipam/backend.py b/phpipam/backend.py
new file mode 100755
index 0000000..81af316
--- /dev/null
+++ b/phpipam/backend.py
@@ -0,0 +1,76 @@
+import requests
+import json
+import datetime
+from dateutil.parser import parse as datetime_parse
+
+class apiConnectionException(Exception):
+ pass
+
+class apiQueryException(Exception):
+ pass
+
+class apiObjectNotFoundException(Exception):
+ pass
+
+class phpipamBackend:
+ def __init__(self, api_url, app_id, api_user, api_password):
+ """
+ Parameters
+ ----------
+ api_url : str
+ URL of the phpIPAM instance. Example: https://phpipam.example.com/
+ app_id : str
+ AppID set in phpIPAM API settings
+ api_user : str
+ username, leave blank to use static token-authentication
+ api_password : str
+ user password or static auth token
+
+ Raises
+ ------
+ apiConnectionException
+ if the connection/authentification fails
+ """
+
+ self.api_url = api_url.strip('/') + '/api/' + app_id
+ self.api_user = api_user
+ self.api_password = api_password
+
+ # Check for static auth
+ if len(self.api_user) == 0:
+ self.api_token = self.api_password
+ self.api_token_expires = ""
+ else:
+ self._getApiToken()
+
+ def _getApiToken(self):
+ data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json()
+ if not data['success']:
+ raise apiConnectionException('Failed to authenticate: ' + str(data['code']))
+
+ self.api_token = data['data']['token']
+ self.api_token_expires = data['data']['expires']
+
+
+ def _isTokenExpired(self):
+ # static auth does not expire
+ if len(self.api_token_expires) == 0:
+ return False
+
+ expiration = datetime_parse(self.api_token_expires)
+
+ return expiration < datetime.datetime.now()
+
+ def request ( self, method, url, data = {} ):
+ """Wrapper for _req for checking result and only returning data"""
+
+ if self._isTokenExpired():
+ self._getApiToken()
+
+ data = requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json()
+
+ if not 'success' in data or not data['success']:
+ raise apiQueryException("Query failed with code " + str(data['code']) + ": " + str(data['message']))
+
+ return data['data']
+
diff --git a/phpipam/resources.py b/phpipam/resources.py
new file mode 100755
index 0000000..597560a
--- /dev/null
+++ b/phpipam/resources.py
@@ -0,0 +1,66 @@
+from .backend import phpipamBackend
+
+resource_types = {
+ 'sections' : {
+ 'getSubnets':{
+ 'method':'GET',
+ 'request':'/sections/{object_id}/subnets',
+ }
+ },
+ 'subnets' : {
+ 'search':{
+ 'method':'GET',
+ 'request':'/subnets/search/{search}'
+ },
+ 'getIP':{
+ 'method':'GET',
+ 'requests':'/subnets/'
+ }
+ },
+ 'addresses' : {
+ },
+ 'devices' : {
+ },
+}
+
+class invalidResourceException(Exception):
+ pass
+
+class invalidResourceOperationException(Exception):
+ pass
+
+class phpipamResourceFunction:
+ def __init__(self, backend, resource, function):
+ if not function in resource_types[resource]:
+ raise invalidResourceOperationException(f'Operation {function} is not defined for {resource}.')
+
+ self._backend = backend
+ self._resource = resource
+ self._function = resource_types[resource][function]
+
+ def __call__(self, **kwargs):
+ if 'data' in kwargs:
+ data = kwargs['data']
+ else:
+ data = {}
+
+ return self._backend.request( self._function['method'], self._function['request'].format(**kwargs), data=data )
+
+class phpipamResource:
+ def __init__(self, backend, resource):
+ if not resource in resource_types:
+ raise invalidResourceException(f'Invalid resource "{resource}"')
+
+ self._type = resource
+ self._backend = backend
+
+ def __getattr__(self, attr):
+ return phpipamResourceFunction(self._backend, self._type, attr)
+
+ def get(self):
+ """List of all objects"""
+ return self._backend.request('GET', f'/{self._type}')
+
+ def byID(self, object_id):
+ """object identified by object_id : str"""
+ return self._backend.request('GET', f'/{self._type}/{object_id}')