Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
new = Record.new(zone, 'a', {
'geo': {
'AF': ['5.5.5.5'],
'NA-US': ['6.6.6.6']
},
'ttl': 300,
'type': 'A',
# This leaves one, swaps ones, and adds one
'values': ['2.2.2.2', '3.3.3.3', '4.4.4.4'],
}, simple)
create = Create(Record.new(zone, 'b', {
'ttl': 3600,
'type': 'CNAME',
'value': 'foo.unit.tests.'
}, simple))
update = Update(existing, new)
delete = Delete(new)
changes = [create, delete, update]
plan = Plan(zone, zone, changes)
self.assertEquals(3600, create.new.ttl)
self.assertEquals(300, update.new.ttl)
plan.make_cautious()
self.assertEquals(60, create.new.ttl)
self.assertEquals(60, update.new.ttl)
new = Record.new(zone, 'a', {
'geo': {
'AF': ['5.5.5.5'],
'NA-US': ['6.6.6.6']
},
'ttl': 300,
'type': 'A',
# This leaves one, swaps ones, and adds one
'values': ['2.2.2.2', '3.3.3.3', '4.4.4.4'],
}, simple)
create = Create(Record.new(zone, 'b', {
'ttl': 60,
'type': 'CNAME',
'value': 'foo.unit.tests.'
}, simple))
update = Update(existing, new)
delete = Delete(new)
changes = [create, delete, update]
plans = [
(simple, Plan(zone, zone, changes)),
(simple, Plan(zone, zone, changes)),
]
class TestPlanHtml(TestCase):
log = getLogger('TestPlanHtml')
def test_empty(self):
out = StringIO()
PlanHtml('html').run([], fh=out)
self.assertEquals('<b>No changes were planned</b>', out.getvalue())
# Add something and delete something
zone = Zone('unit.tests.', [])
existing = Record.new(zone, '', {
'ttl': 300,
'type': 'NS',
# This matches the zone data above, one to delete, one to leave
'values': ['ns1.foo.bar.', 'ns2.foo.bar.'],
})
new = Record.new(zone, '', {
'ttl': 300,
'type': 'NS',
# This leaves one and deletes one
'value': 'ns2.foo.bar.',
})
change = Update(existing, new)
plan = Plan(zone, zone, [change])
provider._apply(plan)
provider._request.assert_has_calls([
call('GET', '/zones', params={'page': 1}),
call('POST', '/zones',
data={'jump_start': False, 'name': 'unit.tests'}),
call('DELETE', '/zones/ff12ab34cd5611334422ab3322997650/'
'dns_records/fc12ab34cd5611334422ab3322997653')
def test_mod_dynamic_update_geo_dynamic(self, _):
provider = DynProvider('test', 'cust', 'user', 'pass',
traffic_directors_enabled=True)
# convert a geo record to a dynamic td
# pre-populate the cache with our mock td
provider._traffic_directors = {
'unit.tests.': {
'A': 42,
}
}
# mock _mod_dynamic_rulesets
provider._mod_dynamic_rulesets = MagicMock()
change = Update(self.geo_a_record, self.dynamic_a_record)
provider._mod_dynamic_Update(None, change)
# still in cache
self.assertTrue('A' in provider.traffic_directors['unit.tests.'])
# should have seen 1 gen call
provider._mod_dynamic_rulesets.assert_called_once_with(42, change)
'TTL': 60,
'Type': 'A'}
}],
'Comment': ANY
},
'HostedZoneId': 'z42'
}
stubber.add_response('change_resource_record_sets',
{'ChangeInfo': {
'Id': 'id',
'Status': 'PENDING',
'SubmittedAt': '2017-01-29T01:02:03Z',
}}, change_resource_record_sets_params)
plan = provider.plan(self.expected)
self.assertEquals(1, len(plan.changes))
self.assertIsInstance(plan.changes[0], Update)
self.assertEquals(1, provider.apply(plan))
stubber.assert_no_pending_responses()
},
'octodns': {
'healthcheck': {
'host': 'foo.bar',
'path': '/_ready'
}
},
'ttl': 60,
'type': 'A',
'value': '1.2.3.4',
})
desired.add_record(record)
extra = provider._extra_changes(desired=desired, changes=[])
self.assertEquals(1, len(extra))
extra = extra[0]
self.assertIsInstance(extra, Update)
self.assertEquals(record, extra.record)
# missing health check
desired = Zone('unit.tests.', [])
record = Record.new(desired, 'geo', {
'geo': {
'NA': ['1.2.3.4'],
},
'ttl': 60,
'type': 'A',
'value': '1.2.3.4',
})
desired.add_record(record)
extra = provider._extra_changes(desired=desired, changes=[])
self.assertEquals(1, len(extra))
extra = extra[0]
provider = DynProvider('test', 'cust', 'user', 'pass',
traffic_directors_enabled=True)
# update of an existing dynamic td
# pre-populate the cache with our mock td
provider._traffic_directors = {
'unit.tests.': {
'A': 42,
}
}
# mock _mod_dynamic_rulesets
provider._mod_dynamic_rulesets = MagicMock()
dyn = self.dynamic_a_record
change = Update(dyn, dyn)
provider._mod_dynamic_Update(None, change)
# still in cache
self.assertTrue('A' in provider.traffic_directors['unit.tests.'])
# should have seen 1 gen call
provider._mod_dynamic_rulesets.assert_called_once_with(42, change)
stubber.assert_no_pending_responses()
# gc through _mod_Create
stubber.add_response('delete_health_check', {}, {
'HealthCheckId': '44',
})
change = Create(record)
provider._mod_Create(change)
stubber.assert_no_pending_responses()
# gc through _mod_Update
stubber.add_response('delete_health_check', {}, {
'HealthCheckId': '44',
})
# first record is ignored for our purposes, we have to pass something
change = Update(record, record)
provider._mod_Create(change)
stubber.assert_no_pending_responses()
# gc through _mod_Delete, expect 3 to go away, can't check order
# b/c it's not deterministic
stubber.add_response('delete_health_check', {}, {
'HealthCheckId': ANY,
})
stubber.add_response('delete_health_check', {}, {
'HealthCheckId': ANY,
})
stubber.add_response('delete_health_check', {}, {
'HealthCheckId': ANY,
})
change = Delete(record)
provider._mod_Delete(change)
status_mock = Mock()
return_values_for_status = iter(
["pending"] * 11 + ['done', 'done'])
type(status_mock).status = PropertyMock(
side_effect=return_values_for_status.next)
gcloud_zone_mock.changes = Mock(return_value=status_mock)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
desired = Mock()
desired.name = "unit.tests."
changes = []
changes.append(Create(create_r))
changes.append(Delete(delete_r))
changes.append(Update(existing=update_existing_r, new=update_new_r))
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes,
exists=True
))
calls_mock = gcloud_zone_mock.changes.return_value
mocked_calls = []
for mock_call in calls_mock.add_record_set.mock_calls:
mocked_calls.append(mock_call[1][0])
self.assertEqual(mocked_calls, [
DummyResourceRecordSet(
'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
extra_changes = []
existing_records = {r: r for r in existing.records}
changed_records = {c.record for c in changes}
for desired_record in desired.records:
if desired_record not in existing.records: # Will be created
continue
elif desired_record in changed_records: # Already being updated
continue
existing_record = existing_records[desired_record]
if (self._record_is_proxied(existing_record) !=
self._record_is_proxied(desired_record)):
extra_changes.append(Update(existing_record, desired_record))
return extra_changes