Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_apply(self):
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
# Create blank zone
with requests_mock() as mock:
mock.post(ANY, status_code=200, text='')
provider.populate(zone)
self.assertEquals(0, len(zone.records))
# Record change failed
with requests_mock() as mock:
mock.post(ANY, status_code=200, text='')
provider.populate(zone)
zone.add_record(Record.new(zone, 'prawf', {
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(0, len(zone.records))
# Check no changes between what we support and what's parsed
# from the unit.tests. config YAML. Also make sure we see the same
# for both after we've thrown away records we don't support
with requests_mock() as mock:
with open('tests/fixtures/mythicbeasts-list.txt') as file_handle:
mock.post(ANY, status_code=200, text=file_handle.read())
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(15, len(zone.records))
self.assertEquals(15, len(self.expected.records))
changes = self.expected.changes(zone, provider)
self.assertEquals(0, len(changes))
from octodns.provider.mythicbeasts import MythicBeastsProvider, \
add_trailing_dot, remove_trailing_dot
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
from octodns.record import Create, Update, Delete, Record
class TestMythicBeastsProvider(TestCase):
expected = Zone('unit.tests.', [])
source = YamlProvider('test_expected', join(dirname(__file__), 'config'))
source.populate(expected)
# Dump anything we don't support from expected
for record in list(expected.records):
if record._type not in MythicBeastsProvider.SUPPORTS:
expected._remove_record(record)
def test_trailing_dot(self):
with self.assertRaises(AssertionError) as err:
add_trailing_dot('unit.tests.')
self.assertEquals('Value already has trailing dot',
err.exception.message)
with self.assertRaises(AssertionError) as err:
remove_trailing_dot('unit.tests')
self.assertEquals('Value already missing trailing dot',
err.exception.message)
self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.')
self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests')
def test_populate(self):
provider = None
# Null passwords dict
with self.assertRaises(AssertionError) as err:
provider = MythicBeastsProvider('test', None)
self.assertEquals('Passwords must be a dictionary',
err.exception.message)
# Missing password
with requests_mock() as mock:
mock.post(ANY, status_code=401, text='ERR Not authenticated')
with self.assertRaises(AssertionError) as err:
provider = MythicBeastsProvider('test', dict())
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(
'Missing password for domain: unit.tests',
err.exception.message)
# Failed authentication
with self.assertRaises(Exception) as err:
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(
'Mythic Beasts unauthorized for zone: unit.tests',
err.exception.message)
# Check unmatched lines are ignored
test_data = 'This should not match'
with requests_mock() as mock:
mock.post(ANY, status_code=200, text=test_data)
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(0, len(zone.records))
# Check unsupported records are skipped
test_data = '@ 60 NOOP prawf\n@ 60 SPF prawf prawf prawf'
with requests_mock() as mock:
mock.post(ANY, status_code=200, text=test_data)
provider = MythicBeastsProvider('test', {
'unit.tests.': 'mypassword'
})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
resp = self.records(zone.name)
before = len(zone.records)
exists = False
data = defaultdict(lambda: defaultdict(lambda: {
'raw_values': [],
'name': None,
'zone': None,
}))
exists = True
for line in resp.content.splitlines():
match = MythicBeastsProvider.RE_POPLINE.match(line.decode("utf-8"))
if match is None:
self.log.debug('failed to match line: %s', line)
continue
if match.group(1) == '@':
_name = ''
else:
_name = match.group('name')
_type = match.group('type')
_ttl = int(match.group('ttl'))
_value = match.group('value').strip()
if hasattr(self, '_data_for_{}'.format(_type)):
if _name not in data[_type]:
def _data_for_CAA(_type, data):
ttl = data['raw_values'][0]['ttl']
raw_value = data['raw_values'][0]['value']
match = MythicBeastsProvider.RE_CAA.match(raw_value)
assert match is not None, 'Unable to parse CAA data'
value = {
'flags': match.group('flags'),
'tag': match.group('tag'),
'value': match.group('value'),
}
return MythicBeastsProvider._data_for_single(
'CAA',
{'raw_values': [{'value': value, 'ttl': ttl}]})
def _data_for_SSHFP(_type, data):
ttl = max([raw_values['ttl'] for raw_values in data['raw_values']])
values = []
for raw_value in \
[raw_values['value'] for raw_values in data['raw_values']]:
match = MythicBeastsProvider.RE_SSHFP.match(raw_value)
assert match is not None, 'Unable to parse SSHFP data'
values.append({
'algorithm': match.group('algorithm'),
'fingerprint_type': match.group('fingerprint_type'),
'fingerprint': match.group('fingerprint'),
})
return {
'type': _type,
'values': values,
'ttl': ttl,
}
def _data_for_CNAME(_type, data):
ttl = data['raw_values'][0]['ttl']
value = data['raw_values'][0]['value']
if not value.endswith('.'):
value = '{}.{}'.format(value, data['zone'])
return MythicBeastsProvider._data_for_single(
_type,
{'raw_values': [
{'value': value, 'ttl': ttl}
]})