Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_descriptor_dereference():
descriptor = 'data/resource_with_dereferencing.json'
resource = Resource(descriptor)
assert resource.descriptor == expand({
'name': 'name',
'data': 'data',
'schema': {'fields': [{'name': 'name'}]},
'dialect': {'delimiter': ','},
'dialects': {'main': {'delimiter': ','}},
})
def test_descriptor_dereferencing_uri_pointer(self):
descriptor = {
'resources': [
{'name': 'name1', 'data': 'data', 'schema': '#/schemas/main'},
{'name': 'name2', 'data': 'data', 'dialect': '#/dialects/0'},
],
'schemas': {'main': {'fields': [{'name': 'name'}]}},
'dialects': [{'delimiter': ','}],
}
dp = datapackage.DataPackage(descriptor)
assert dp.descriptor['resources'] == list(map(helpers.expand_resource_descriptor, [
{'name': 'name1', 'data': 'data', 'schema': {'fields': [{'name': 'name'}]}},
{'name': 'name2', 'data': 'data', 'dialect': {'delimiter': ','}},
]))
def test_expand_resource_descriptor_doesnt_add_default_encoding():
descriptor = {}
expanded_descriptor = helpers.expand_resource_descriptor(descriptor)
assert 'encoding' not in descriptor
def test_descriptor_retrieve_url(patch_get):
descriptor = 'http://example.com/descriptor.json'
descriptor_contents = {
'name': 'name',
'data': 'data',
}
# Mocks
patch_get(descriptor, body=json.dumps(descriptor_contents))
# Tests
actual = Resource(descriptor).descriptor
expect = expand(descriptor_contents)
assert actual == expect
def test_descriptor_retrieve_dict():
descriptor = {
'name': 'name',
'data': 'data',
}
actual = Resource(descriptor).descriptor
expect = expand(descriptor)
assert actual == expect
def test_descriptor_retrieve_path():
descriptor = 'data/data-resource.json'
actual = Resource(descriptor).descriptor
expect = expand(json.load(io.open(descriptor, encoding='utf-8')))
assert actual == expect
def test_descriptor_dereference_local():
descriptor = {
'name': 'name',
'data': 'data',
'schema': 'table_schema.json',
}
resource = Resource(descriptor, base_path='data')
assert resource.descriptor == expand({
'name': 'name',
'data': 'data',
'schema': {'fields': [{'name': 'name'}]},
})
def test_descriptor_dereferencing_uri():
package = Package('data/datapackage_with_dereferencing.json')
assert package.descriptor['resources'] == list(map(helpers.expand_resource_descriptor, [
{'name': 'name1', 'data': 'data', 'schema': {'fields': [{'name': 'name'}]}},
{'name': 'name2', 'data': 'data', 'dialect': {'delimiter': ','}},
]))
def __build(self):
# Process descriptor
expand = helpers.expand_resource_descriptor
self.__current_descriptor = expand(self.__current_descriptor)
self.__next_descriptor = deepcopy(self.__current_descriptor)
# Inspect source
self.__source_inspection = _inspect_source(
self.__current_descriptor.get('data'),
self.__current_descriptor.get('path'),
self.__base_path,
self.__storage)
# Instantiate profile
self.__profile = Profile(self.__current_descriptor.get('profile'))
# Validate descriptor
try:
self.__profile.validate(self.__current_descriptor)