1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#from octodns.zone import Zone
from octodns.provider.base import BaseProvider
from octodns.provider.plan import Plan
#from octodns.source.axfr import ZoneFileSource
import logging
import dns.zone
import dns.rdataclass
import dns.rdatatype
class ZoneFileProvider(BaseProvider):
SUPPORTS_GEO=False
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NS', 'PTR', 'SPF',
'SRV', 'TXT'))
def __init__(self, id, directory):
'''
Arguments
=========
id: str
directory: str
check_origin: bool
'''
self.log = logging.getLogger('ZoneFileProvider[{}]'.format(id))
self.log.debug('__init__: directory={}'.format(directory))
self.directory = directory
super(ZoneFileProvider, self).__init__(id)
def populate(self, zone, target=False, lenient=False):
if target:
return False
raise NotImplementedError("ZoneFileProvider only implements the target part."+
" Use OctoDns' own ZoneFileSource to read from ZoneFiles.")
def _apply(self,plan):
'''
Arguments
=========
plan: octodns.provider.plan.Plan
'''
records = plan.desired.records
zone = dns.zone.Zone(plan.desired.name)
for record in plan.desired.records:
data = record.data
name = record.name
rdset = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.from_text(record._type))
if 'value' in data:
rdset.add(dns.rdata.from_text(dns.rdataclass.IN,
dns.rdatatype.from_text(record._type), data['value']), ttl=int(data['ttl'] ))
elif 'values' in data:
for value in data['values']:
rdset.add(dns.rdata.from_text(dns.rdataclass.IN,
dns.rdatatype.from_text(record._type), value), ttl=int(data['ttl'] ))
else:
self.log.warning("neither value nor values found in {}".format(name))
continue
zone.replace_rdataset(name, rdset)
zone.to_file( self.directory + '/' + plan.desired.name)
|