Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Handle deprecated default_base_path argument
if default_base_path is not None:
warnings.warn(
'Argument "default_base_path" is deprecated. '
'Please use "base_path" argument.',
UserWarning)
base_path = default_base_path
# Extract from zip
tempdir, descriptor = _extract_zip_if_possible(descriptor)
if tempdir:
self.__tempdir = tempdir
# Get base path
if base_path is None:
base_path = helpers.get_descriptor_base_path(descriptor)
# Instantiate storage
if storage and not isinstance(storage, Storage):
storage = Storage.connect(storage, **options)
# Get descriptor from storage
if storage and not descriptor:
descriptor = {'resources': []}
for bucket in storage.buckets:
descriptor['resources'].append({'path': bucket})
# Process descriptor
descriptor = helpers.retrieve_descriptor(descriptor)
descriptor = helpers.dereference_package_descriptor(descriptor, base_path)
# Handle deprecated resource.path/url
def __init__(self, descriptor=None, schema=None, default_base_path=None):
# Extract from zip
descriptor = self._extract_zip_if_possible(descriptor)
# Get base path
self._base_path = helpers.get_descriptor_base_path(descriptor) or default_base_path
# Process actions
self._descriptor = helpers.retrieve_descriptor(descriptor)
helpers.dereference_data_package_descriptor(self._descriptor, self._base_path)
helpers.expand_data_package_descriptor(self._descriptor)
# Get profile
profile = self._descriptor['profile']
# Handle deprecated schema argument
if schema is not None:
warnings.warn(
'Argument "schema" is deprecated. '
'Please use "descriptor.profile" property.',
UserWarning)
if isinstance(schema, six.string_types):
def __init__(self, descriptor={}, base_path=None, strict=False, storage=None,
# Internal
package=None, **options):
# Get base path
if base_path is None:
base_path = helpers.get_descriptor_base_path(descriptor)
# Instantiate storage
if storage and not isinstance(storage, Storage):
storage = Storage.connect(storage, **options)
# Process descriptor
descriptor = helpers.retrieve_descriptor(descriptor)
descriptor = helpers.dereference_resource_descriptor(descriptor, base_path)
# Handle deprecated resource.path.url
if descriptor.get('url'):
warnings.warn(
'Resource property "url: " is deprecated. '
'Please use "path: " instead.',
UserWarning)
descriptor['path'] = descriptor['url']