How to use the octodns.provider.mythicbeasts.MythicBeastsProvider function in octodns

To help you get started, we’ve selected a few octodns examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github github / octodns / tests / test_octodns_provider_mythicbeasts.py View on Github external
def test_apply(self):
        provider = MythicBeastsProvider('test', {
            'unit.tests.': 'mypassword'
        })
        zone = Zone('unit.tests.', [])

        # Create blank zone
        with requests_mock() as mock:
            mock.post(ANY, status_code=200, text='')
            provider.populate(zone)

        self.assertEquals(0, len(zone.records))

        # Record change failed
        with requests_mock() as mock:
            mock.post(ANY, status_code=200, text='')
            provider.populate(zone)
            zone.add_record(Record.new(zone, 'prawf', {
github github / octodns / tests / test_octodns_provider_mythicbeasts.py View on Github external
provider = MythicBeastsProvider('test', {
                'unit.tests.': 'mypassword'
            })
            zone = Zone('unit.tests.', [])
            provider.populate(zone)
            self.assertEquals(0, len(zone.records))

        # Check no changes between what we support and what's parsed
        # from the unit.tests. config YAML. Also make sure we see the same
        # for both after we've thrown away records we don't support
        with requests_mock() as mock:
            with open('tests/fixtures/mythicbeasts-list.txt') as file_handle:
                mock.post(ANY, status_code=200, text=file_handle.read())

            provider = MythicBeastsProvider('test', {
                'unit.tests.': 'mypassword'
            })
            zone = Zone('unit.tests.', [])
            provider.populate(zone)

            self.assertEquals(15, len(zone.records))
            self.assertEquals(15, len(self.expected.records))
            changes = self.expected.changes(zone, provider)
            self.assertEquals(0, len(changes))
github github / octodns / tests / test_octodns_provider_mythicbeasts.py View on Github external
from octodns.provider.mythicbeasts import MythicBeastsProvider, \
    add_trailing_dot, remove_trailing_dot
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
from octodns.record import Create, Update, Delete, Record


class TestMythicBeastsProvider(TestCase):
    expected = Zone('unit.tests.', [])
    source = YamlProvider('test_expected', join(dirname(__file__), 'config'))
    source.populate(expected)

    # Dump anything we don't support from expected
    for record in list(expected.records):
        if record._type not in MythicBeastsProvider.SUPPORTS:
            expected._remove_record(record)

    def test_trailing_dot(self):
        with self.assertRaises(AssertionError) as err:
            add_trailing_dot('unit.tests.')
        self.assertEquals('Value already has trailing dot',
                          err.exception.message)

        with self.assertRaises(AssertionError) as err:
            remove_trailing_dot('unit.tests')
        self.assertEquals('Value already missing trailing dot',
                          err.exception.message)

        self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.')
        self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests')
github github / octodns / tests / test_octodns_provider_mythicbeasts.py View on Github external
def test_populate(self):
        provider = None

        # Null passwords dict
        with self.assertRaises(AssertionError) as err:
            provider = MythicBeastsProvider('test', None)
        self.assertEquals('Passwords must be a dictionary',
                          err.exception.message)

        # Missing password
        with requests_mock() as mock:
            mock.post(ANY, status_code=401, text='ERR Not authenticated')

            with self.assertRaises(AssertionError) as err:
                provider = MythicBeastsProvider('test', dict())
                zone = Zone('unit.tests.', [])
                provider.populate(zone)
            self.assertEquals(
                'Missing password for domain: unit.tests',
                err.exception.message)

        # Failed authentication
github github / octodns / tests / test_octodns_provider_mythicbeasts.py View on Github external
with self.assertRaises(Exception) as err:
                provider = MythicBeastsProvider('test', {
                    'unit.tests.': 'mypassword'
                })
                zone = Zone('unit.tests.', [])
                provider.populate(zone)
            self.assertEquals(
                'Mythic Beasts unauthorized for zone: unit.tests',
                err.exception.message)

        # Check unmatched lines are ignored
        test_data = 'This should not match'
        with requests_mock() as mock:
            mock.post(ANY, status_code=200, text=test_data)

            provider = MythicBeastsProvider('test', {
                'unit.tests.': 'mypassword'
            })
            zone = Zone('unit.tests.', [])
            provider.populate(zone)
            self.assertEquals(0, len(zone.records))

        # Check unsupported records are skipped
        test_data = '@ 60 NOOP prawf\n@ 60 SPF prawf prawf prawf'
        with requests_mock() as mock:
            mock.post(ANY, status_code=200, text=test_data)

            provider = MythicBeastsProvider('test', {
                'unit.tests.': 'mypassword'
            })
            zone = Zone('unit.tests.', [])
            provider.populate(zone)
github github / octodns / octodns / provider / mythicbeasts.py View on Github external
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
                       target, lenient)

        resp = self.records(zone.name)

        before = len(zone.records)
        exists = False
        data = defaultdict(lambda: defaultdict(lambda: {
            'raw_values': [],
            'name': None,
            'zone': None,
        }))

        exists = True
        for line in resp.content.splitlines():
            match = MythicBeastsProvider.RE_POPLINE.match(line.decode("utf-8"))

            if match is None:
                self.log.debug('failed to match line: %s', line)
                continue

            if match.group(1) == '@':
                _name = ''
            else:
                _name = match.group('name')

            _type = match.group('type')
            _ttl = int(match.group('ttl'))
            _value = match.group('value').strip()

            if hasattr(self, '_data_for_{}'.format(_type)):
                if _name not in data[_type]:
github github / octodns / octodns / provider / mythicbeasts.py View on Github external
def _data_for_CAA(_type, data):
        ttl = data['raw_values'][0]['ttl']
        raw_value = data['raw_values'][0]['value']

        match = MythicBeastsProvider.RE_CAA.match(raw_value)

        assert match is not None, 'Unable to parse CAA data'

        value = {
            'flags': match.group('flags'),
            'tag': match.group('tag'),
            'value': match.group('value'),
        }

        return MythicBeastsProvider._data_for_single(
            'CAA',
            {'raw_values': [{'value': value, 'ttl': ttl}]})
github github / octodns / octodns / provider / mythicbeasts.py View on Github external
def _data_for_SSHFP(_type, data):
        ttl = max([raw_values['ttl'] for raw_values in data['raw_values']])
        values = []

        for raw_value in \
                [raw_values['value'] for raw_values in data['raw_values']]:
            match = MythicBeastsProvider.RE_SSHFP.match(raw_value)

            assert match is not None, 'Unable to parse SSHFP data'

            values.append({
                'algorithm': match.group('algorithm'),
                'fingerprint_type': match.group('fingerprint_type'),
                'fingerprint': match.group('fingerprint'),
            })

        return {
            'type': _type,
            'values': values,
            'ttl': ttl,
        }
github github / octodns / octodns / provider / mythicbeasts.py View on Github external
def _data_for_CNAME(_type, data):
        ttl = data['raw_values'][0]['ttl']
        value = data['raw_values'][0]['value']
        if not value.endswith('.'):
            value = '{}.{}'.format(value, data['zone'])

        return MythicBeastsProvider._data_for_single(
            _type,
            {'raw_values': [
                {'value': value, 'ttl': ttl}
            ]})