How to use the octodns.zone.Zone function in octodns

To help you get started, we’ve selected a few octodns examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github github / octodns / tests / test_octodns_provider_yaml.py View on Github external
def test_zone_directory(self):
        source = SplitYamlProvider(
            'test', join(dirname(__file__), 'config/split'))

        zone = Zone('unit.tests.', [])

        self.assertEqual(
            join(dirname(__file__), 'config/split/unit.tests.'),
            source._zone_directory(zone))
github github / octodns / tests / test_octodns_provider_route53.py View on Github external
provider, stubber = self._get_stubbed_provider()

        list_hosted_zones_resp = {
            'HostedZones': [{
                'Name': 'unit.tests.',
                'Id': 'z42',
                'CallerReference': 'abc',
            }],
            'Marker': 'm',
            'IsTruncated': False,
            'MaxItems': '100',
        }
        stubber.add_response('list_hosted_zones', list_hosted_zones_resp, {})

        # record with geo and no health check returns change
        existing = Zone('unit.tests.', [])
        record = Record.new(existing, 'a', {
            'ttl': 30,
            'type': 'A',
            'value': '1.2.3.4',
            'geo': {
                'NA': ['2.2.3.4'],
            }
        })
        existing.add_record(record)
        list_resource_record_sets_resp = {
            'ResourceRecordSets': [{
                'Name': 'a.unit.tests.',
                'Type': 'A',
                'GeoLocation': {
                    'ContinentCode': 'NA',
                },
github github / octodns / tests / test_octodns_provider_dyn.py View on Github external
plan = provider.plan(self.expected)
                update_mock.assert_not_called()
                provider.apply(plan)
                update_mock.assert_called()
                self.assertFalse(plan.exists)
            add_mock.assert_called()
            # Once for each dyn record (8 Records, 2 of which have dual values)
            self.assertEquals(15, len(add_mock.call_args_list))
        execute_mock.assert_has_calls([call('/Zone/unit.tests/', 'GET', {}),
                                       call('/Zone/unit.tests/', 'GET', {})])
        self.assertEquals(10, len(plan.changes))

        execute_mock.reset_mock()

        # Delete one and modify another
        new = Zone('unit.tests.', [])
        for name, data in (
            ('a', {
                'type': 'A',
                'ttl': 30,
                'value': '2.3.4.5'
            }),
            ('ptr', {
                'type': 'PTR',
                'ttl': 30,
                'value': 'xx.unit.tests.'
            }),
            ('_srv._tcp', {
                'type': 'SRV',
                'ttl': 30,
                'values': [{
                    'priority': 31,
github github / octodns / tests / test_octodns_provider_route53.py View on Github external
def _get_test_plan(self, max_changes):

        provider = Route53Provider('test', 'abc', '123', max_changes)

        # Use the stubber
        stubber = Stubber(provider._conn)
        stubber.activate()

        got = Zone('unit.tests.', [])

        list_hosted_zones_resp = {
            'HostedZones': [],
            'Marker': 'm',
            'IsTruncated': False,
            'MaxItems': '100',
        }
        stubber.add_response('list_hosted_zones', list_hosted_zones_resp,
                             {})

        create_hosted_zone_resp = {
            'HostedZone': {
                'Name': 'unit.tests.',
                'Id': 'z42',
                'CallerReference': 'abc',
            },
github github / octodns / tests / test_octodns_provider_transip.py View on Github external
def make_expected(self):
        expected = Zone('unit.tests.', [])
        source = YamlProvider('test', join(dirname(__file__), 'config'))
        source.populate(expected)
        return expected
github github / octodns / tests / test_octodns_provider_googlecloud.py View on Github external
#
#

from __future__ import absolute_import, division, print_function, \
    unicode_literals

from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider

from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider

from unittest import TestCase
from mock import Mock, patch, PropertyMock

zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
    'ttl': 0,
    'type': 'A',
    'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
    'ttl': 1,
    'type': 'A',
    'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
    'ttl': 9001,
    'type': 'A',
    'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
    'ttl': 2,
    'type': 'A',
github github / octodns / tests / test_octodns_provider_route53.py View on Github external
provider, stubber = self._get_stubbed_provider()

        list_hosted_zones_resp = {
            'HostedZones': [{
                'Name': 'unit.tests.',
                'Id': 'z42',
                'CallerReference': 'abc',
            }],
            'Marker': 'm',
            'IsTruncated': False,
            'MaxItems': '100',
        }
        stubber.add_response('list_hosted_zones', list_hosted_zones_resp, {})

        # record with geo and no health check returns change
        existing = Zone('unit.tests.', [])
        record = Record.new(existing, 'a', {
            'ttl': 30,
            'type': 'A',
            'value': '1.2.3.4',
            'geo': {
                'NA': ['2.2.3.4'],
            }
        })
        existing.add_record(record)
        list_resource_record_sets_resp = {
            'ResourceRecordSets': [{
                # other name
                'Name': 'unit.tests.',
                'Type': 'A',
                'GeoLocation': {
                    'CountryCode': '*',
github github / octodns / tests / test_octodns_provider_dyn.py View on Github external
DummyRecord('1.2.3.6', 1, 90),
            DummyRecord('1.2.3.7', 1, 90),
        ]
        default_response_pool = DummyResponsePool('default', multi_a)
        pool1_response_pool = DummyResponsePool('pool1', multi_a)
        rulesets = [
            DummyRuleset('default', [default_response_pool]),
            DummyRuleset('0:abcdef', [pool1_response_pool], 'geoip', {
                'geoip': {
                    'country': ['US'],
                    'province': ['or'],
                    'region': [14],
                },
            }),
        ]
        zone = Zone('unit.tests.', [])
        td = DummyTrafficDirector(zone.name, rulesets,
                                  [default_response_pool, pool1_response_pool])
        record = provider._populate_dynamic_traffic_director(zone, fqdn, 'A',
                                                             td, rulesets,
                                                             True)
        self.assertTrue(record)
        self.assertEquals('A', record._type)
        self.assertEquals(90, record.ttl)
        self.assertEquals([
            '1.2.3.5',
            '1.2.3.6',
            '1.2.3.7',
        ], record.values)
        self.assertTrue('pool1' in record.dynamic.pools)
        self.assertEquals({
            'fallback': None,
github github / octodns / tests / test_octodns_provider_selectel.py View on Github external
def test_apply(self, fake_http):

        fake_http.get('{}/unit.tests/records/'.format(self.API_URL),
                      json=list())
        fake_http.get('{}/'.format(self.API_URL), json=self.domain)
        fake_http.head('{}/unit.tests/records/'.format(self.API_URL),
                       headers={'X-Total-Count': '0'})
        fake_http.head('{}/'.format(self.API_URL),
                       headers={'X-Total-Count': str(len(self.domain))})
        fake_http.post('{}/100000/records/'.format(self.API_URL), json=list())

        provider = SelectelProvider(123, 'test_token')

        zone = Zone('unit.tests.', [])

        for record in self.expected:
            zone.add_record(record)

        plan = provider.plan(zone)
        self.assertEquals(8, len(plan.changes))
        self.assertEquals(8, provider.apply(plan))
github github / octodns / octodns / provider / base.py View on Github external
def plan(self, desired):
        self.log.info('plan: desired=%s', desired.name)

        existing = Zone(desired.name, desired.sub_zones)
        self.populate(existing, target=True, lenient=True)

        # compute the changes at the zone/record level
        changes = existing.changes(desired, self)

        # allow the provider to filter out false positives
        before = len(changes)
        changes = filter(self._include_change, changes)
        after = len(changes)
        if before != after:
            self.log.info('plan:   filtered out %s changes', before - after)

        # allow the provider to add extra changes it needs
        extra = self._extra_changes(existing, changes)
        if extra:
            self.log.info('plan:   extra changes\n  %s', '\n  '