aboutsummaryrefslogtreecommitdiff
path: root/phpipam/backend.py
blob: 81af316505def7186f046278c4a4afa4aaefc32c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import requests
import json
import datetime
from dateutil.parser import parse as datetime_parse

class apiConnectionException(Exception):
    pass

class apiQueryException(Exception):
    pass

class apiObjectNotFoundException(Exception):
    pass

class phpipamBackend:
    def __init__(self, api_url, app_id, api_user, api_password):
        """
        Parameters
        ----------
        api_url : str
            URL of the phpIPAM instance. Example: https://phpipam.example.com/
        app_id : str
            AppID set in phpIPAM API settings
        api_user : str
            username, leave blank to use static token-authentication
        api_password : str
            user password or static auth token

        Raises
        ------
        apiConnectionException
            if the connection/authentification fails
        """

        self.api_url = api_url.strip('/') + '/api/' + app_id
        self.api_user = api_user
        self.api_password = api_password

        # Check for static auth
        if len(self.api_user) == 0:
            self.api_token = self.api_password
            self.api_token_expires = ""
        else:
            self._getApiToken()

    def _getApiToken(self):
        data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json()
        if not data['success']:
            raise apiConnectionException('Failed to authenticate: ' + str(data['code']))

        self.api_token = data['data']['token']
        self.api_token_expires = data['data']['expires']


    def _isTokenExpired(self):
        # static auth does not expire
        if len(self.api_token_expires) == 0:
            return False

        expiration = datetime_parse(self.api_token_expires)

        return expiration < datetime.datetime.now()

    def request ( self, method, url, data = {} ):
        """Wrapper for _req for checking result and only returning data"""

        if self._isTokenExpired():
            self._getApiToken()

        data = requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json()

        if not 'success' in data or not data['success']:
            raise apiQueryException("Query failed with code " + str(data['code']) + ": " + str(data['message']))

        return data['data']