1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
from octodns.provider.base import BaseProvider
import logging
import dns.zone
import dns.rdataclass
import dns.rdatatype
import os
class RdataParameterException(Exception):
def __init__(self, msg):
super().__init__(msg)
def _create_rdata( rdclass, rdtype, data ):
if isinstance(data,str):
return dns.rdata.from_text(rdclass, rdtype, data)
cls = dns.rdata.get_rdata_class(rdclass, rdtype)
for slot in cls.__slots__:
if not slot in data:
raise RdataParameterException('{} is missing'.format(slot))
return cls(rdclass, rdtype, **data)
class ZoneFileProvider(BaseProvider):
SUPPORTS_GEO = False
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NS', 'PTR', 'SPF',
'SRV', 'TXT'))
'''
SOA dict
{
mname
rname
serial
refresh
retry
expire
ttl
}
'''
def __init__(self, id, directory, soa, soa_ttl=3600, file_extension = ''):
'''
Arguments
=========
id: str
directory: str
soa: dict
extension: str
'''
self.log = logging.getLogger('ZoneFileProvider[{}]'.format(id))
self.log.debug('__init__: directory={}'.format(directory))
self.directory = directory
self.file_extension = file_extension
self.soa = soa
self.soa_ttl = soa_ttl
# OctoDNS does not recursively check dicts for 'env/' keyword
# TODO Error handling
serial = self.soa['serial']
if type(serial) == str and serial.startswith('env/'):
self.soa['serial'] = int(os.environ[ serial.split('/',1)[1] ])
super(ZoneFileProvider, self).__init__(id)
def populate(self, zone, target=False, lenient=False):
if target:
return False
raise NotImplementedError(
"ZoneFileProvider only implements the target part." +
" Use OctoDns' own ZoneFileSource to read from ZoneFiles.")
def _apply(self, plan):
'''
Arguments
=========
plan: octodns.provider.plan.Plan
'''
zone = dns.zone.Zone(plan.desired.name)
soaset = dns.rdataset.Rdataset(
dns.rdataclass.IN, dns.rdatatype.SOA)
soaset.add(_create_rdata(
dns.rdataclass.IN,
dns.rdatatype.SOA,
self.soa), self.soa_ttl)
zone.replace_rdataset('@',soaset)
for record in plan.desired.records:
data = record.data
name = record.name
rdset = dns.rdataset.Rdataset(
dns.rdataclass.IN, dns.rdatatype.from_text(record._type))
if 'value' in data:
rdset.add(
_create_rdata(
dns.rdataclass.IN,
dns.rdatatype.from_text(record._type),
data['value']),
ttl=int(data['ttl']))
elif 'values' in data:
for value in data['values']:
rdset.add(
_create_rdata(
dns.rdataclass.IN,
dns.rdatatype.from_text(record._type),
value),
ttl=int(data['ttl']))
else:
self.log.warning(
"neither value nor values found in {}".format(name))
continue
zone.replace_rdataset(name, rdset)
zone.to_file(self.directory + '/' + plan.desired.name + self.file_extension)
|