aboutsummaryrefslogtreecommitdiff
path: root/octodns-custom-providers/source/phpipam.py
blob: e3d4c28ed34d8e5e3fb3b4c70502fd5709995466 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#import octodns.zone.Zone
import octodns.record
import octodns.source.base
import phpipam
import logging
import re

class PhpipamSource(octodns.source.base.BaseSource):
    SUPPORTS_GEO=False
    SUPPORTS=set(('A', 'AAAA'))

    def __init__(self, id, url, user, token, appid, cidrs, tag = 'Used', default_ttl='3600'):
        '''
        Arguments
        =========
        id: str
        url: str
        user: str
        token: str
        appid: str
        cidrs: str[]
            list of cidrs to search
        tag: str
            Address tag name
        '''
        self.log = logging.getLogger('PhpipamSource[{}]'.format(id))
        self.log.debug('__init__: id=%s url=%s appid=%s', id, url, appid)

        super(PhpipamSource, self).__init__(id)

        self._default_ttl = default_ttl
        self._tag = tag
        self._cidrs = cidrs
        self._ipam = phpipam.PhpipamAPI( url, appid, user, token )

    def populate(self, zone, target=False, lenient=False):
        ipam = self._ipam
        tag = self._tag
        cidrs = self._cidrs
        domain = zone.name

        tags = ipam.addresses.getTags()
        for _tag in tags:
            if _tag['type'] == tag:
                tag_id=_tag['id']
        if len(tag_id) == 0:
            self.log.error(f'populate(): tag {tag} was not found.')
            return False

        self.log.debug(f'populate(): tag {tag} has id {tag_id}')

        selected_addresses = {}

        for _cidr in cidrs:
            subnets = ipam.subnets.search(search=_cidr)

            if not len(subnets) == 1:
                self.log.warning(f'populate(): CIDR {_cidr} has no or no exact match. Ignoring.')
                continue
            subnet = subnets[0]

            addresses = ipam.subnets.getAddresses(subnet_id=subnet['id'])
            for _address in addresses:
                hostname = _address['hostname']
                ip = _address['ip']

                if not _address['tag'] == tag_id:
                    continue
                if not hostname.endswith(domain.strip('.')):
                    continue

                if hostname in selected_addresses:
                    selected_addresses[hostname].append(ip)
                else:
                    selected_addresses[hostname]=[ip]

        for _selected_address in selected_addresses:
            # TODO check for IPv6
            hostname = re.sub( '\.' + zone.name.strip('.').replace('.', '\.') + '$', '', _selected_address)

            data={
                'type':'A',
                'ttl':self._default_ttl,
                'values':selected_addresses[_selected_address]
            }

            new_record = octodns.record.Record.new( zone, hostname, data)
            zone.add_record( new_record )

        return True