1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
import requests
import json
import datetime
from dateutil.parser import parse as datetime_parse
class apiConnectionException(Exception):
pass
class apiQueryException(Exception):
pass
class apiObjectNotFoundException(Exception):
pass
class phpipam:
"""
phpIPAM API Implementation
ReadOnly because I don't need the writing bit
https://phpipam.net/api-documentation/
"""
def __init__(self, api_url, app_id, api_user, api_password):
"""
Parameters
----------
api_url : str
URL of the phpIPAM instance. Example: https://phpipam.example.com/
app_id : str
AppID set in phpIPAM API settings
api_user : str
username, leave blank to use static token-authentication
api_password : str
user password or static auth token
Raises
------
apiConnectionException
if the connection/authentification fails
"""
self.api_url = api_url.strip('/') + '/api/' + app_id
self.api_user = api_user
self.api_password = api_password
# Check for static auth
if len(self.api_user) == 0:
self.api_token = self.api_password
self.api_token_expires = ""
else:
self._getApiToken()
def _req( self, method, url, data = {} ):
if self._isTokenExpired():
self._getApiToken()
return requests.request(method, self.api_url + url, data=data, headers={'token':self.api_token}).json()
def _getApiToken(self):
data = requests.post(self.api_url + "/user", auth=(self.api_user,self.api_password)).json()
if not data['success']:
raise apiConnectionException('Failed to authenticate: ' + str(data['code']))
self.api_token = data['data']['token']
self.api_token_expires = data['data']['expires']
def _isTokenExpired(self):
# static auth does not expire
if len(self.api_token_expires) == 0:
return False
expiration = datetime_parse(self.api_token_expires)
return expiration < datetime.datetime.now()
# TODO remove
def _checkTokenExpired(self):
data = self._req('GET', '/user/')
return data['success']
def _checkReq ( self, method, url, data = {} ):
"""Wrapper for _req for checking result and only returning data"""
data = self._req( method, url, data )
if not 'success' in data or not data['success']:
raise apiQueryException("Query failed with code " + str(data['code'] + ": " + str(['message'])))
return data['data']
#
def getSections(self):
"""
Get the complete list of dictionaries describing sections
Returns: list of dicts
"""
return self._checkReq('GET', '/sections/')
def getSectionById(self, section_id):
"""
Get a section by id
Parameters
----------
section_id : str
Returns: dict
"""
return self._checkReq('GET', f'/sections/{section_id}')
def getSectionByName(self, name):
"""
Find a section by name
Parameters
----------
name : str
name of the section to search for
Returns: dict
Raises
------
apiObjectNotFoundException
if no section matches name
"""
data = self.getSections()
for section in data:
if 'name' in section and section['name'] == name:
return section
raise apiObjectNotFoundException(f"Section {name} was not found.")
def getSubnets(self, section_id):
"""
Get the complete list of dictionaries describing subnets of a section
Parameters
----------
section_id : str
"""
return self._checkReq('GET', f'/sections/{section_id}/subnets')
def getSubnetById(self, subnet_id):
"""
Get a section by id
Parameters
----------
subnet_id : str
"""
return self._checkReq('GET', f'/subnets/{subnet_id}')
|