Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
provider._request = Mock()
provider._request.side_effect = [
self.empty, # no zones
{
'result': {
'id': 42,
}
}, # zone create
None,
None,
]
# Add something and delete something
zone = Zone('unit.tests.', [])
existing = Record.new(zone, '', {
'ttl': 300,
'type': 'NS',
# This matches the zone data above, one to delete, one to leave
'values': ['ns1.foo.bar.', 'ns2.foo.bar.'],
})
new = Record.new(zone, '', {
'ttl': 300,
'type': 'NS',
# This leaves one and deletes one
'value': 'ns2.foo.bar.',
})
change = Update(existing, new)
plan = Plan(zone, zone, [change])
provider._apply(plan)
provider._request.assert_has_calls([
with self.assertRaises(ValidationError) as ctx:
# The . will put this over the edge
name = 'x' * 64
Record.new(self.zone, name, {
'ttl': 300,
'type': 'A',
'value': '1.2.3.4',
})
reason = ctx.exception.reasons[0]
self.assertTrue(reason.startswith('invalid name, "xxxx'))
self.assertTrue(reason.endswith('xxx" is too long at 64'
' chars, max is 63'))
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals(['missing ttl'], ctx.exception.reasons)
# invalid ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
})
self.assertEquals('www.unit.tests.', ctx.exception.fqdn)
self.assertEquals(['invalid ttl'], ctx.exception.reasons)
# no exception if we're in lenient mode
def test_existing_nameservers(self):
ns_values = ['8.8.8.8.', '9.9.9.9.']
provider = PowerDnsProvider('test', 'non.existant', 'api-key',
nameserver_values=ns_values)
expected = Zone('unit.tests.', [])
ns_record = Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ns_values
})
expected.add_record(ns_record)
# no changes
with requests_mock() as mock:
data = {
'rrsets': [{
'comments': [],
'name': 'unit.tests.',
'records': [
{
'content': '8.8.8.8.',
'disabled': False
name = 'x' * (253 - len(self.zone.name))
Record.new(self.zone, name, {
'ttl': 300,
'type': 'A',
'value': '1.2.3.4',
})
reason = ctx.exception.reasons[0]
self.assertTrue(reason.startswith('invalid fqdn, "xxxx'))
self.assertTrue(reason.endswith('.unit.tests." is too long at 254'
' chars, max is 253'))
# label length, DNS defins max as 63
with self.assertRaises(ValidationError) as ctx:
# The . will put this over the edge
name = 'x' * 64
Record.new(self.zone, name, {
'ttl': 300,
'type': 'A',
'value': '1.2.3.4',
})
reason = ctx.exception.reasons[0]
self.assertTrue(reason.startswith('invalid name, "xxxx'))
self.assertTrue(reason.endswith('xxx" is too long at 64'
' chars, max is 63'))
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals(['missing ttl'], ctx.exception.reasons)
},
])
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(1, len(zone.records))
record = list(zone.records)[0]
self.assertEquals('', record.name)
self.assertEquals('unit.tests.', record.fqdn)
self.assertEquals('ALIAS', record._type)
self.assertEquals('unit.tests.cdn.cloudflare.net.', record.value)
# CDN enabled records can't be updated, we don't know the real values
# never point a Cloudflare record to itsself.
wanted = Zone('unit.tests.', [])
wanted.add_record(Record.new(wanted, '', {
'ttl': 300,
'type': 'ALIAS',
'value': 'change.unit.tests.cdn.cloudflare.net.'
}))
plan = provider.plan(wanted)
self.assertEquals(False, hasattr(plan, 'changes'))
def test_CNAME(self):
# doesn't blow up
Record.new(self.zone, 'www', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
# root cname is a no-no
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
self.assertEquals(['root CNAME not allowed'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
again = Zone('unit.tests.', [])
provider.populate(again)
self.assertEquals(14, len(again.records))
# bust the cache
del provider._zone_records[zone.name]
# test handling of invalid content
with requests_mock() as mock:
with open('tests/fixtures/dnsimple-invalid-content.json') as fh:
mock.get(ANY, text=fh.read())
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(set([
Record.new(zone, '', {
'ttl': 3600,
'type': 'SSHFP',
'values': []
}),
Record.new(zone, '_srv._tcp', {
'ttl': 600,
'type': 'SRV',
'values': []
}),
Record.new(zone, 'naptr', {
'ttl': 600,
'type': 'NAPTR',
'values': []
}),
]), zone.records)
# NS with sub
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns3.unit.tests.',
'subDomain': 'www3',
'id': 8
})
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns4.unit.tests.',
'subDomain': 'www3',
'id': 9
})
expected.add(Record.new(zone, 'www3', {
'ttl': 700,
'type': 'NS',
'values': ['ns3.unit.tests.', 'ns4.unit.tests.'],
}))
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '10 20 30 foo-1.unit.tests.',
'subDomain': '_srv._tcp',
'id': 10
})
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '40 50 60 foo-2.unit.tests.',
for _type, data in types.items():
if len(data) > 1:
# Multiple data indicates a record with GeoDNS, convert
# them data into the format we need
geo = {}
for d in data:
if 'geo' in d:
geo[d['geo']] = \
d.get('values', None) or d['value']
else:
primary = d
data = primary
data['geo'] = geo
else:
data = data[0]
record = Record.new(zone, name, data, source=self,
lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',
len(zone.records) - before)