Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
standard_version = self._find_standard_version(name, version)
# Make sure that the examples in the schema files (and thus the
# ASDF standard document) are valid.
buff = helpers.yaml_to_asdf(
'example: ' + self.example.strip(), standard_version=standard_version)
ff = AsdfFile(
uri=util.filepath_to_url(os.path.abspath(self.filename)),
extensions=TestExtension())
# Fake an external file
ff2 = AsdfFile({'data': np.empty((1024*1024*8), dtype=np.uint8)})
ff._external_asdf_by_uri[
util.filepath_to_url(
os.path.abspath(
os.path.join(
os.path.dirname(self.filename), 'external.asdf')))] = ff2
# Add some dummy blocks so that the ndarray examples work
for i in range(3):
b = block.Block(np.zeros((1024*1024*8), dtype=np.uint8))
b._used = True
ff.blocks.add(b)
b._array_storage = "streamed"
try:
with pytest.warns(None) as w:
import warnings
ff._open_impl(ff, buff, mode='rw')
# Do not tolerate any warnings that occur during schema validation
def _from_tree_tagged_missing_requirements(cls, tree, ctx):
# A special version of AsdfType.from_tree_tagged for when the
# required dependencies for an AsdfType are missing.
plural, verb = ('s', 'are') if len(cls.requires) else ('', 'is')
message = "{0} package{1} {2} required to instantiate '{3}'".format(
util.human_list(cls.requires), plural, verb, tree._tag)
# This error will be handled by yamlutil.tagged_tree_to_custom_tree, which
# will cause a warning to be issued indicating that the tree failed to be
# converted.
raise TypeError(message)
def __new__(mcls, name, bases, attrs):
requires = mcls._find_in_bases(attrs, bases, 'requires', [])
if not mcls._has_required_modules(requires):
attrs['from_tree_tagged'] = classmethod(
_from_tree_tagged_missing_requirements)
attrs['types'] = []
attrs['has_required_modules'] = False
else:
attrs['has_required_modules'] = True
types = mcls._find_in_bases(attrs, bases, 'types', [])
new_types = []
for typ in types:
if isinstance(typ, six.string_types):
typ = util.resolve_name(typ)
new_types.append(typ)
attrs['types'] = new_types
cls = super(AsdfTypeMeta, mcls).__new__(mcls, name, bases, attrs)
if hasattr(cls, 'name'):
if isinstance(cls.name, six.string_types):
if 'yaml_tag' not in attrs:
cls.yaml_tag = cls.make_yaml_tag(cls.name)
elif isinstance(cls.name, list):
pass
elif cls.name is not None:
raise TypeError("name must be string or list")
_all_asdftypes.add(cls)
def __init__(self, fd, mode, close=False, uri=None):
super(RealFile, self).__init__(fd, mode, close=close, uri=uri)
stat = os.fstat(fd.fileno())
if sys.platform.startswith('win'): # pragma: no cover
# There appears to be reliable way to get block size on Windows,
# so just choose a reasonable default
self._blksize = io.DEFAULT_BUFFER_SIZE
else:
self._blksize = stat.st_blksize
self._size = stat.st_size
if (uri is None and
isinstance(fd.name, str)):
self._uri = util.filepath_to_url(os.path.abspath(fd.name))
def __init__(self, fd, mode, close=False, uri=None):
super(RealFile, self).__init__(fd, mode, close=close, uri=uri)
stat = os.fstat(fd.fileno())
if sys.platform.startswith('win'): # pragma: no cover
# There appears to be reliable way to get block size on Windows,
# so just choose a reasonable default
self._blksize = io.DEFAULT_BUFFER_SIZE
else:
self._blksize = stat.st_blksize
self._size = stat.st_size
if (uri is None and
isinstance(fd.name, six.string_types)):
self._uri = util.filepath_to_url(os.path.abspath(fd.name))
def _update_extension_history(self):
if 'history' not in self.tree:
self.tree['history'] = dict(extensions=[])
# Support clients who are still using the old history format
elif isinstance(self.tree['history'], list):
histlist = self.tree['history']
self.tree['history'] = dict(entries=histlist, extensions=[])
warnings.warn("The ASDF history format has changed in order to "
"support metadata about extensions. History entries "
"should now be stored under tree['history']['entries'].")
elif 'extensions' not in self.tree['history']:
self.tree['history']['extensions'] = []
for extension in self.type_index.get_extensions_used():
ext_name = util.get_class_name(extension)
ext_meta = ExtensionMetadata(extension_class=ext_name)
metadata = self._extension_metadata.get(ext_name)
if metadata is not None:
ext_meta.software = dict(name=metadata[0], version=metadata[1])
for i, entry in enumerate(self.tree['history']['extensions']):
# Update metadata about this extension if it already exists
if entry.extension_class == ext_meta.extension_class:
self.tree['history']['extensions'][i] = ext_meta
break
else:
self.tree['history']['extensions'].append(ext_meta)
def __new__(mcls, name, bases, attrs):
requires = mcls._find_in_bases(attrs, bases, 'requires', [])
if not mcls._has_required_modules(requires):
attrs['from_tree_tagged'] = classmethod(
_from_tree_tagged_missing_requirements)
attrs['types'] = []
attrs['has_required_modules'] = False
else:
attrs['has_required_modules'] = True
types = mcls._find_in_bases(attrs, bases, 'types', [])
new_types = []
for typ in types:
if isinstance(typ, str):
typ = util.resolve_name(typ)
new_types.append(typ)
attrs['types'] = new_types
cls = super(ExtensionTypeMeta, mcls).__new__(mcls, name, bases, attrs)
if hasattr(cls, 'version'):
if not isinstance(cls.version, (AsdfVersion, AsdfSpec)):
cls.version = AsdfVersion(cls.version)
if hasattr(cls, 'name'):
if isinstance(cls.name, str):
if 'yaml_tag' not in attrs:
cls.yaml_tag = cls.make_yaml_tag(cls.name)
elif isinstance(cls.name, list):
pass
elif cls.name is not None:
values.append(node_item)
return node
def represent_ordereddict(dumper, data):
return represent_ordered_mapping(dumper, YAML_OMAP_TAG, data)
AsdfLoader.add_constructor(YAML_OMAP_TAG, ordereddict_constructor)
AsdfDumper.add_representer(OrderedDict, represent_ordereddict)
# ----------------------------------------------------------------------
# Handle numpy scalars
for scalar_type in util.iter_subclasses(np.floating):
AsdfDumper.add_representer(scalar_type, AsdfDumper.represent_float)
for scalar_type in util.iter_subclasses(np.integer):
AsdfDumper.add_representer(scalar_type, AsdfDumper.represent_int)
def custom_tree_to_tagged_tree(tree, ctx):
"""
Convert a tree, possibly containing custom data types that aren't
directly representable in YAML, to a tree of basic data types,
annotated with tags.
"""
def walker(node):
tag = ctx.type_index.from_custom_type(type(node), ctx.version_string)
if tag is not None:
return tag.to_tree_tagged(node, ctx)
def find_or_create_block_for_array(self, arr, ctx):
from .tags.core import ndarray
if not isinstance(arr, ndarray.NDArrayType):
base = util.get_array_base(arr)
for hdu in self._hdulist:
if hdu.data is None:
continue
if base is util.get_array_base(hdu.data):
return _FitsBlock(hdu)
return super(
_EmbeddedBlockManager, self).find_or_create_block_for_array(arr, ctx)