Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_integrity():
descriptor = deepcopy(DESCRIPTOR)
resource = Resource(descriptor)
assert resource.check_integrity()
def test_source_multipart_local_bad_not_safe_traversing():
descriptor = {
'name': 'name',
'path': ['chunk1.csv', '../fixtures/chunk2.csv'],
}
with pytest.raises(exceptions.DataPackageException):
resource = Resource(descriptor, base_path='data')
def test_descriptor_table_tabular_inline():
descriptor = {
'name': 'name',
'profile': 'tabular-data-resource',
'data': [
['id', 'name'],
['1', 'english'],
['2', 'ä¸å›½äºº'],
],
'schema': 'resource_schema.json',
}
resource = Resource(descriptor, base_path='data')
assert resource.table.read(keyed=True) == [
{'id': 1, 'name': 'english'},
{'id': 2, 'name': 'ä¸å›½äºº'},
]
def test_descriptor_table_tabular_multipart_local():
descriptor = {
'name': 'name',
'profile': 'tabular-data-resource',
'path': ['chunk1.csv', 'chunk2.csv'],
'schema': 'resource_schema.json',
}
resource = Resource(descriptor, base_path='data')
assert resource.table.read(keyed=True) == [
{'id': 1, 'name': 'english'},
{'id': 2, 'name': 'ä¸å›½äºº'},
]
def test_descriptor_expand_tabular_schema():
descriptor = {
'name': 'name',
'data': 'data',
'profile': 'tabular-data-resource',
'schema': {
'fields': [{'name': 'name'}],
},
}
resource = Resource(descriptor)
assert resource.descriptor == {
'name': 'name',
'data': 'data',
'profile': 'tabular-data-resource',
'schema': {
'fields': [{'name': 'name', 'type': 'string', 'format': 'default'}],
'missingValues': [''],
}
def test_source_multipart_remote():
descriptor = {
'name': 'name',
'path': ['http://example.com/chunk1.csv', 'http://example.com/chunk2.csv'],
}
resource = Resource(descriptor)
assert resource.source == [
'http://example.com/chunk1.csv', 'http://example.com/chunk2.csv']
assert resource.remote == True
assert resource.multipart == True
def test_read_integrity_hash_error():
descriptor = deepcopy(DESCRIPTOR)
descriptor['bytes'] = None
descriptor['hash'] += 'a'
resource = Resource(descriptor)
with pytest.raises(exceptions.IntegrityError) as excinfo:
resource.read(integrity=True)
assert DESCRIPTOR['hash'].replace('sha256:', '') in str(excinfo.value)
def _update_resources(self, current_resources, descriptor, base_path):
resources_dicts = descriptor.get('resources')
new_resources = []
if resources_dicts is not None:
for resource_dict in resources_dicts:
resource = [res for res in current_resources
if res.descriptor == resource_dict]
if not resource:
resource = [Resource(resource_dict, base_path)]
new_resources.append(resource[0])
return tuple(new_resources)
try:
self.__profile.validate(self.__current_descriptor)
self.__errors = []
except exceptions.ValidationError as exception:
self.__errors = exception.errors
if self.__strict:
raise exception
# Update resource
descriptors = self.__current_descriptor.get('resources', [])
self.__resources = self.__resources[:len(descriptors)]
iterator = enumerate(six.moves.zip_longest(list(self.__resources), descriptors))
for index, (resource, descriptor) in iterator:
if (not resource or resource.descriptor != descriptor or
(resource.schema and resource.schema.foreign_keys)):
updated_resource = Resource(descriptor,
strict=self.__strict,
base_path=self.__base_path,
storage=self.__storage,
package=self)
if not resource:
self.__resources.append(updated_resource)
else:
self.__resources[index] = updated_resource