Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def copy_directory(self, from_, dest_path, dry_run, log):
log.debug("copy.mkdir", dest=dest_path.parent)
fileutils.mkdir_p(str(dest_path.parent))
# We don't want to risk partially-copied packaged left on disk, so we copy to a tmp dir in same
# folder and then atomically rename into place.
tmp_dir = Path(tempfile.mkdtemp(prefix='.dea-mv-', dir=str(dest_path.parent)))
try:
tmp_package = tmp_dir.joinpath(from_.name)
log.info("copy.put", src=from_, tmp_dest=tmp_package)
if not dry_run:
shutil.copytree(from_, tmp_package)
log.debug("copy.put.done")
os.rename(tmp_package, dest_path)
log.debug("copy.rename.done")
# It should have been contained within the dataset, see the check in the constructor.
assert self.dest_metadata_path.exists()
finally:
log.debug("tmp_dir.rm", tmp_dir=tmp_dir)
def build_pathset(
collection: Collection,
cache_path: Path = None,
log=_LOG) -> dawg.CompletionDAWG:
"""
Build a combined set (in dawg form) of all dataset paths in the given index and filesystem.
Optionally use the given cache directory to cache repeated builds.
"""
locations_cache = cache_path.joinpath(query_name(collection.query), 'locations.dawg') if cache_path else None
if locations_cache:
fileutils.mkdir_p(str(locations_cache.parent))
log = log.bind(collection_name=collection.name)
if locations_cache and not cache_is_too_old(locations_cache):
path_set = dawg.CompletionDAWG()
log.debug("paths.trie.cache.load", file=locations_cache)
path_set.load(str(locations_cache))
else:
log.info("paths.trie.build")
path_set = dawg.CompletionDAWG(
chain(
collection.all_indexed_uris(),
collection.all_uris()
)
)
log.info("paths.trie.done")
if locations_cache is not None:
if multiband:
return index + 1
return 1
band_uris = {name: {'layer': layer(index), 'path': uris[index]}
for index, name in enumerate(stat.product.measurements)}
datasets = self._find_source_datasets(stat, uri=None, band_uris=band_uris)
if yaml_filename is None:
yaml_filename = str(output_filenames[0].with_suffix('.yaml'))
# Write to Yaml
if len(datasets) == 1: # I don't think there should ever be more than 1 dataset in here...
_LOG.info('writing dataset yaml for %s to %s', stat, yaml_filename)
with fileutils.atomic_save(yaml_filename) as yaml_dst:
yaml_dst.write(datasets.values[0])
else:
_LOG.error('Unexpected more than 1 dataset %r being written at once, '
'investigate!', datasets)
band_uris = {name: {'layer': layer(index), 'path': uris[index]}
for index, name in enumerate(all_measurement_defns)}
datasets = self._find_source_datasets(output_product, uri=None, band_uris=band_uris)
if yaml_filename is None:
yaml_filename = str(output_filenames[0].with_suffix('.yaml'))
# Write to Yaml
if len(datasets) == 1: # I don't think there should ever be more than 1 dataset in here...
_LOG.info('writing dataset yaml for %s to %s', output_product, yaml_filename)
if aws:
datasets.values[0].metadata_doc['lineage'] = {'source_datasets': {}}
with open(yaml_filename, 'w') as yaml_dst:
yaml.dump(datasets.values[0].metadata_doc, yaml_dst, default_flow_style=False, Dumper=Dumper)
else:
datasets = datasets_to_doc(datasets)
with fileutils.atomic_save(yaml_filename) as yaml_dst:
yaml_dst.write(datasets.values[0])
else:
_LOG.error('Unexpected more than 1 dataset %r being written at once, '
'investigate!', datasets)
def atomic_rename(src, dest):
"""Wrap boltons.fileutils.atomic_rename to allow passing `str` or `pathlib.Path`"""
_LOG.info('renaming %s to %s', src, dest)
fileutils.replace(str(src), str(dest))
CleanConsoleRenderer() if sys.stdout.isatty() else structlog.processors.KeyValueRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
cache = Path(cache_folder)
for collection_name in collections:
collection = NCI_COLLECTIONS[collection_name]
log = _LOG.bind(collection=collection_name)
collection_cache = cache.joinpath(query_name(collection.query))
fileutils.mkdir_p(str(collection_cache))
with AgdcDatasetPathIndex(index, collection.query) as path_index:
for mismatch in find_index_disk_mismatches(log,
path_index,
collection.base_path,
collection.offset_pattern,
cache_path=collection_cache):
click.echo('\t'.join(map(str, (
collection_name,
strutils.camel2under(mismatch.__class__.__name__),
mismatch.dataset.id,
mismatch.uri
))))
if not dry_run:
log.info('mismatch.fix', mismatch=mismatch)
mismatch.fix(path_index)
def install_package(self, source: Union[LocalDependencySource, PackageDependencySource]) -> Any:
with source as files:
if source.is_local:
self.log.debug(f"installing {source} as local")
return
if isinstance(files, list):
self.log.debug(f"installing {source} as module(s)")
# Iterates over flattened list of stubs tuple
file_paths = [(f, (self.pkg_path / f.name)) for f in list(sum(files, ()))]
for paths in file_paths:
shutil.move(*paths) # overwrites if existing
return file_paths
self.log.debug(f'installing {source} as package')
pkg_path = self.pkg_path / source.package.name
return fileutils.copytree(files, pkg_path)
def atomic_rename(src, dest):
"""Wrap boltons.fileutils.atomic_rename to allow passing `str` or `pathlib.Path`"""
_LOG.info('renaming %s to %s', src, dest)
fileutils.replace(str(src), str(dest))