aboutsummaryrefslogtreecommitdiff
path: root/phpipam/__init__.py
blob: 08a24668a40b2ce586d6cae030a2022513e4ede8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import requests
import json
import datetime
from dateutil.parser import parse as datetime_parse

class apiConnectionException(Exception):
    pass

class apiQueryException(Exception):
    pass

class apiObjectNotFoundException(Exception):
    pass

class phpipam:
    """
    phpIPAM API Implementation
    ReadOnly because I don't need the writing bit

    https://phpipam.net/api-documentation/
    """

    def __init__(self, api_url, app_id, api_user, api_password):
        """
        Parameters
        ----------
        api_url : str
            URL of the phpIPAM instance. Example: https://phpipam.example.com/
        app_id : str
            AppID set in phpIPAM API settings
        api_user : str
            username, leave blank to use static token-authentication
        api_password : str
            user password or static auth token

        Raises
        ------
        apiConnectionException
            if the connection/authentification fails
        """

        self.api_url = api_url.strip('/') + '/api/' + app_id
        self.api_user = api_user
        self.api_password = api_password

        # Check for static auth
        if len(self.api_user) == 0:
            self.api_token = self.api_password
            self.api_token_expires = ""
        else:
            self._getApiToken()

    def _req( self, method, url, data = {} ):
        if self._isTokenExpired():
            self._getApiToken()

        return requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json()

    def _getApiToken(self):
        data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json()
        if not data['success']:
            raise apiConnectionException('Failed to authenticate: ' + str(data['code']))

        self.api_token = data['data']['token']
        self.api_token_expires = data['data']['expires']


    def _isTokenExpired(self):
        # static auth does not expire
        if len(self.api_token_expires) == 0:
            return False

        expiration = datetime_parse(self.api_token_expires)

        return expiration < datetime.datetime.now()

    # TODO remove
    def _checkTokenExpired(self):
        data = self._req('GET', '/user/')
        return data['success']

    def _checkReq ( self, method, url, data = {} ):
        """Wrapper for _req for checking result and only returning data"""

        data = self._req( method, url, data )

        if not 'success' in data or not data['success']:
            raise apiQueryException("Query failed with code " + str(data['code'] + ": " + str(['message'])))

        return data['data']

    #
    def getSections(self):
        """
        Get the complete list of dictionaries describing sections
        Returns: list of dicts
        """

        return self._checkReq('GET', '/sections/')

    def getSectionById(self, section_id):
        """
        Get a section by id

        Parameters
        ----------
        section_id : str

        Returns: dict
        """

        return self._checkReq('GET', f'/sections/{section_id}')


    def getSectionByName(self, name):
        """
        Find a section by name

        Parameters
        ----------
        name : str
            name of the section to search for

        Returns: dict

        Raises
        ------
        apiObjectNotFoundException
            if no section matches name
        """

        data = self.getSections()

        for section in data:
            if 'name' in section and section['name'] == name:
                return section

        raise apiObjectNotFoundException(f"Section {name} was not found.")

    def getSubnets(self, section_id):
        """
        Get the complete list of dictionaries describing subnets of a section

        Parameters
        ----------
        section_id : str
        """

        return self._checkReq('GET', f'/sections/{section_id}/subnets')

    def getSubnetById(self, subnet_id):
        """
        Get a section by id

        Parameters
        ----------
        subnet_id : str
        """

        return self._checkReq('GET', f'/subnets/{subnet_id}')