Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'type': 'A',
},
{
'id': 11189899,
'name': 'ttl',
'value': '3.2.3.4',
'ttl': 600,
'type': 'A',
}
])
# Domain exists, we don't care about return
resp.json.side_effect = ['{}']
wanted = Zone('unit.tests.', [])
wanted.add_record(Record.new(wanted, 'ttl', {
'ttl': 300,
'type': 'A',
'value': '3.2.3.4'
}))
plan = provider.plan(wanted)
self.assertEquals(2, len(plan.changes))
self.assertEquals(2, provider.apply(plan))
# recreate for update, and deletes for the 2 parts of the other
provider._client._request.assert_has_calls([
call('POST', '/123123/records', data={
'value': '3.2.3.4',
'type': 'A',
'name': 'ttl',
'ttl': 300
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
"tft3ohx3nvJl+bGCWxdtLYDSmir9PW54e5CTdxEh8MWRkBO3StF6" \
"QG/tAh3aTGDmkqhIJGLb87iHvpmVKqURmEUzJPv5KPJfWLofADI+" \
"q9lQIDAQAB"
zone = Zone('unit.tests.', [])
expected = set()
# A, subdomain=''
api_record.append({
'fieldType': 'A',
'ttl': 100,
'target': '1.2.3.4',
'subDomain': '',
'id': 1
})
expected.add(Record.new(zone, '', {
'ttl': 100,
'type': 'A',
'value': '1.2.3.4',
}))
# A, subdomain='sub
api_record.append({
'fieldType': 'A',
'ttl': 200,
'target': '1.2.3.4',
'subDomain': 'sub',
'id': 2
})
expected.add(Record.new(zone, 'sub', {
'ttl': 200,
'type': 'A',
'type': 'PTR',
'ttl': 30,
'value': 'a-ptr-2.example.com.'
}),
('8', {
'type': 'PTR',
'ttl': 3600,
'value': 'has-dup-def123.example.com.'
}),
('7', {
'type': 'PTR',
'ttl': 1800,
'value': 'some-host-abc123.example.com.'
}),
):
record = Record.new(expected, name, data)
expected.add_record(record)
changes = expected.changes(got, SimpleProvider())
self.assertEquals([], changes)
def test_sub_zones(self):
# NS for exactly the sub is allowed
zone = Zone('unit.tests.', set(['sub', 'barred']))
record = Record.new(zone, 'sub', {
'ttl': 3600,
'type': 'NS',
'values': ['1.2.3.4.', '2.3.4.5.'],
})
zone.add_record(record)
self.assertEquals(set([record]), zone.records)
# non-NS for exactly the sub is rejected
zone = Zone('unit.tests.', set(['sub', 'barred']))
record = Record.new(zone, 'sub', {
'ttl': 3600,
'type': 'A',
'values': ['1.2.3.4', '2.3.4.5'],
})
with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record)
# Skip type, remove trailing comments, and omit newline
line = line[1:].split('#', 1)[0]
# Split on :'s including :: and strip leading/trailing ws
line = [p.strip() for p in self.split_re.split(line)]
match = name_re.match(line[0])
if not match:
continue
name = zone.hostname_from_fqdn(line[0])
data[name][_type].append(line[1:])
for name, types in data.items():
for _type, d in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
data = data_for(_type, d)
if data:
record = Record.new(zone, name, data, source=self,
lenient=lenient)
try:
zone.add_record(record, lenient=lenient)
except SubzoneRecordException:
self.log.debug('_populate_normal: skipping subzone '
'record=%s', record)
_records = []
records = self._dns_client.record_sets.list_by_dns_zone
if self._check_zone(zone_name):
exists = True
for azrecord in records(self._resource_group, zone_name):
if _parse_azure_type(azrecord.type) in self.SUPPORTS:
_records.append(azrecord)
for azrecord in _records:
record_name = azrecord.name if azrecord.name != '@' else ''
typ = _parse_azure_type(azrecord.type)
data = getattr(self, '_data_for_{}'.format(typ))
data = data(azrecord)
data['type'] = typ
data['ttl'] = azrecord.ttl
record = Record.new(zone, record_name, data, source=self)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
exists = False
records = []
values = defaultdict(lambda: defaultdict(list))
for record in records:
values[record['subDomain']][record['fieldType']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
if _type not in self.SUPPORTS:
self.log.warning('Not managed record of type %s, skip',
_type)
continue
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
def _populate_from_file(self, filename, zone, lenient):
with open(filename, 'r') as fh:
yaml_data = safe_load(fh, enforce_order=self.enforce_order)
if yaml_data:
for name, data in yaml_data.items():
if not isinstance(data, list):
data = [data]
for d in data:
if 'ttl' not in d:
d['ttl'] = self.default_ttl
record = Record.new(zone, name, d, source=self,
lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.debug(
'_populate_from_file: successfully loaded "%s"', filename)
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
values = defaultdict(lambda: defaultdict(list))
for record in self.zone_records(zone):
_type = record['type']
if _type not in self.SUPPORTS:
continue
name = zone.hostname_from_fqdn(record['name'])
values[name][record['type']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records',
len(zone.records) - before)