Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_zone_directory(self):
source = SplitYamlProvider(
'test', join(dirname(__file__), 'config/split'))
zone = Zone('unit.tests.', [])
self.assertEqual(
join(dirname(__file__), 'config/split/unit.tests.'),
source._zone_directory(zone))
provider, stubber = self._get_stubbed_provider()
list_hosted_zones_resp = {
'HostedZones': [{
'Name': 'unit.tests.',
'Id': 'z42',
'CallerReference': 'abc',
}],
'Marker': 'm',
'IsTruncated': False,
'MaxItems': '100',
}
stubber.add_response('list_hosted_zones', list_hosted_zones_resp, {})
# record with geo and no health check returns change
existing = Zone('unit.tests.', [])
record = Record.new(existing, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
'geo': {
'NA': ['2.2.3.4'],
}
})
existing.add_record(record)
list_resource_record_sets_resp = {
'ResourceRecordSets': [{
'Name': 'a.unit.tests.',
'Type': 'A',
'GeoLocation': {
'ContinentCode': 'NA',
},
plan = provider.plan(self.expected)
update_mock.assert_not_called()
provider.apply(plan)
update_mock.assert_called()
self.assertFalse(plan.exists)
add_mock.assert_called()
# Once for each dyn record (8 Records, 2 of which have dual values)
self.assertEquals(15, len(add_mock.call_args_list))
execute_mock.assert_has_calls([call('/Zone/unit.tests/', 'GET', {}),
call('/Zone/unit.tests/', 'GET', {})])
self.assertEquals(10, len(plan.changes))
execute_mock.reset_mock()
# Delete one and modify another
new = Zone('unit.tests.', [])
for name, data in (
('a', {
'type': 'A',
'ttl': 30,
'value': '2.3.4.5'
}),
('ptr', {
'type': 'PTR',
'ttl': 30,
'value': 'xx.unit.tests.'
}),
('_srv._tcp', {
'type': 'SRV',
'ttl': 30,
'values': [{
'priority': 31,
def _get_test_plan(self, max_changes):
provider = Route53Provider('test', 'abc', '123', max_changes)
# Use the stubber
stubber = Stubber(provider._conn)
stubber.activate()
got = Zone('unit.tests.', [])
list_hosted_zones_resp = {
'HostedZones': [],
'Marker': 'm',
'IsTruncated': False,
'MaxItems': '100',
}
stubber.add_response('list_hosted_zones', list_hosted_zones_resp,
{})
create_hosted_zone_resp = {
'HostedZone': {
'Name': 'unit.tests.',
'Id': 'z42',
'CallerReference': 'abc',
},
def make_expected(self):
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
return expected
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
provider, stubber = self._get_stubbed_provider()
list_hosted_zones_resp = {
'HostedZones': [{
'Name': 'unit.tests.',
'Id': 'z42',
'CallerReference': 'abc',
}],
'Marker': 'm',
'IsTruncated': False,
'MaxItems': '100',
}
stubber.add_response('list_hosted_zones', list_hosted_zones_resp, {})
# record with geo and no health check returns change
existing = Zone('unit.tests.', [])
record = Record.new(existing, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
'geo': {
'NA': ['2.2.3.4'],
}
})
existing.add_record(record)
list_resource_record_sets_resp = {
'ResourceRecordSets': [{
# other name
'Name': 'unit.tests.',
'Type': 'A',
'GeoLocation': {
'CountryCode': '*',
DummyRecord('1.2.3.6', 1, 90),
DummyRecord('1.2.3.7', 1, 90),
]
default_response_pool = DummyResponsePool('default', multi_a)
pool1_response_pool = DummyResponsePool('pool1', multi_a)
rulesets = [
DummyRuleset('default', [default_response_pool]),
DummyRuleset('0:abcdef', [pool1_response_pool], 'geoip', {
'geoip': {
'country': ['US'],
'province': ['or'],
'region': [14],
},
}),
]
zone = Zone('unit.tests.', [])
td = DummyTrafficDirector(zone.name, rulesets,
[default_response_pool, pool1_response_pool])
record = provider._populate_dynamic_traffic_director(zone, fqdn, 'A',
td, rulesets,
True)
self.assertTrue(record)
self.assertEquals('A', record._type)
self.assertEquals(90, record.ttl)
self.assertEquals([
'1.2.3.5',
'1.2.3.6',
'1.2.3.7',
], record.values)
self.assertTrue('pool1' in record.dynamic.pools)
self.assertEquals({
'fallback': None,
def test_apply(self, fake_http):
fake_http.get('{}/unit.tests/records/'.format(self.API_URL),
json=list())
fake_http.get('{}/'.format(self.API_URL), json=self.domain)
fake_http.head('{}/unit.tests/records/'.format(self.API_URL),
headers={'X-Total-Count': '0'})
fake_http.head('{}/'.format(self.API_URL),
headers={'X-Total-Count': str(len(self.domain))})
fake_http.post('{}/100000/records/'.format(self.API_URL), json=list())
provider = SelectelProvider(123, 'test_token')
zone = Zone('unit.tests.', [])
for record in self.expected:
zone.add_record(record)
plan = provider.plan(zone)
self.assertEquals(8, len(plan.changes))
self.assertEquals(8, provider.apply(plan))
def plan(self, desired):
self.log.info('plan: desired=%s', desired.name)
existing = Zone(desired.name, desired.sub_zones)
self.populate(existing, target=True, lenient=True)
# compute the changes at the zone/record level
changes = existing.changes(desired, self)
# allow the provider to filter out false positives
before = len(changes)
changes = filter(self._include_change, changes)
after = len(changes)
if before != after:
self.log.info('plan: filtered out %s changes', before - after)
# allow the provider to add extra changes it needs
extra = self._extra_changes(existing, changes)
if extra:
self.log.info('plan: extra changes\n %s', '\n '