Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_descriptor_dereferencing_uri_remote():
# Mocks
httpretty.register_uri(httpretty.GET,
'http://example.com/schema', body='{"fields": [{"name": "name"}]}')
httpretty.register_uri(httpretty.GET,
'https://example.com/dialect', body='{"delimiter": ","}')
# Tests
descriptor = {
'resources': [
{'name': 'name1', 'data': 'data', 'schema': 'http://example.com/schema'},
{'name': 'name2', 'data': 'data', 'dialect': 'https://example.com/dialect'},
],
}
package = Package(descriptor)
assert package.descriptor['resources'] == list(map(helpers.expand_resource_descriptor, [
{'name': 'name1', 'data': 'data', 'schema': {'fields': [{'name': 'name'}]}},
{'name': 'name2', 'data': 'data', 'dialect': {'delimiter': ','}},
]))
def __init__(self, descriptor=None, schema=None, default_base_path=None):
# Extract from zip
descriptor = self._extract_zip_if_possible(descriptor)
# Get base path
self._base_path = helpers.get_descriptor_base_path(descriptor) or default_base_path
# Process actions
self._descriptor = helpers.retrieve_descriptor(descriptor)
helpers.dereference_data_package_descriptor(self._descriptor, self._base_path)
helpers.expand_data_package_descriptor(self._descriptor)
# Get profile
profile = self._descriptor['profile']
# Handle deprecated schema argument
if schema is not None:
warnings.warn(
'Argument "schema" is deprecated. '
'Please use "descriptor.profile" property.',
UserWarning)
if isinstance(schema, six.string_types):
if schema in ['base', 'default']:
schema = 'data-package'
elif schema == 'tabular':
elif len(path) == 1:
# Remote
if urlparse(path[0]).scheme in config.REMOTE_SCHEMES:
inspection['source'] = path[0]
inspection['remote'] = True
elif base_path and urlparse(base_path).scheme in config.REMOTE_SCHEMES:
norm_base_path = base_path if base_path.endswith('/') else base_path + '/'
inspection['source'] = urljoin(norm_base_path, path[0])
inspection['remote'] = True
# Local
else:
# Path is not safe
if not helpers.is_safe_path(path[0]):
raise exceptions.DataPackageException(
'Local path "%s" is not safe' % path[0])
# Not base path
if not base_path:
raise exceptions.DataPackageException(
'Local path "%s" requires base path' % path[0])
inspection['source'] = os.path.join(base_path, path[0])
inspection['local'] = True
# Inspect
filename = os.path.basename(path[0])
inspection['format'] = os.path.splitext(filename)[1][1:]
inspection['name'] = os.path.splitext(filename)[0]
inspection['tabular'] = inspection['format'] in config.TABULAR_FORMATS
def __build(self):
# Process descriptor
expand = helpers.expand_package_descriptor
self.__current_descriptor = expand(self.__current_descriptor)
self.__next_descriptor = deepcopy(self.__current_descriptor)
# Instantiate profile
self.__profile = Profile(self.__current_descriptor.get('profile'))
# Validate descriptor
try:
self.__profile.validate(self.__current_descriptor)
self.__errors = []
except exceptions.ValidationError as exception:
self.__errors = exception.errors
if self.__strict:
raise exception
# Update resource
def __get_integrity(self):
return {
'size': self.__current_descriptor.get('bytes'),
'hash': helpers.extract_sha256_hash(self.__current_descriptor.get('hash')),
}