Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if config_to_load is not None:
pass
# TODO: Switch this message to a debug message using _LOGGER
self.debug("\nUsing custom config file: {}".format(config_to_load))
else:
# No custom config file specified. Check for default
default_config = default_pipeline_config(sys.argv[0])
if os.path.isfile(default_config):
config_to_load = default_config
self.debug("Using default pipeline config file: {}".
format(config_to_load))
# Finally load the config we found.
if config_to_load is not None:
self.debug("\nLoading config file: {}\n".format(config_to_load))
self.config = AttMapEcho(load_yaml(config_to_load))
else:
self.debug("No config file")
self.config = None
def __init__(self, config_file=None, pm=None):
# parse yaml into the project's attributes
# self.add_entries(**config)
super(NGSTk, self).__init__(
None if config_file is None else load_yaml(config_file))
# Keep a link to the pipeline manager, if one is provided.
# if None is provided, instantiate "tools" and "parameters" with empty AttMaps
# this allows the usage of the same code for a command with and without using a pipeline manager
if pm is not None:
self.pm = pm
if hasattr(pm.config, "tools"):
self.tools = self.pm.config.tools
else:
self.tools = AttMapEcho()
if hasattr(pm.config, "parameters"):
self.parameters = self.pm.config.parameters
else:
self.parameters = AttMapEcho()
else:
self.tools = AttMapEcho()
def _read_schema(schema):
"""
Safely read schema from YAML-formatted file.
:param str | Mapping schema: path to the schema file
or schema in a dict form
:return dict: read schema
:raise TypeError: if the schema arg is neither a Mapping nor a file path
"""
if isinstance(schema, str):
return _load_yaml(schema)
elif isinstance(schema, dict):
return schema
raise TypeError("schema has to be either a dict, URL to remote schema "
"or a path to an existing file")
def read_schema(schema):
"""
Safely read schema from YAML-formatted file.
:param str | Mapping schema: path to the schema file
or schema in a dict form
:return dict: read schema
:raise TypeError: if the schema arg is neither a Mapping nor a file path
"""
if isinstance(schema, str):
return _load_yaml(schema)
elif isinstance(schema, dict):
return schema
raise TypeError("schema has to be either a dict, URL to remote schema "
"or a path to an existing file")
def main():
""" main workflow """
parser = build_argparser()
args, remaining_args = parser.parse_known_args()
cfg = select_config(args.config, refgenconf.CFG_ENV_VARS, check_exist=True, strict_env=True)
if not cfg:
raise MissingGenomeConfigError(args.config)
rgc = refgenconf.RefGenConf(filepath=cfg, writable=True)
pths = [args.path, mkabs(args.path, rgc.genome_folder)]
if not untar_or_copy(pths[0], os.path.join(rgc.genome_folder, args.genome)) \
and not untar_or_copy(pths[1], os.path.join(rgc.genome_folder, args.genome)):
rgc.unlock()
raise OSError("Path '{}' does not exist. Tried: {}".format(args.path, " and ".join(pths)))
path_components = [rgc.genome_folder] + [args.genome] + ["*"] * 3 + ["Sequence"]
assets_paths = glob(os.path.join(*path_components))
assert len(assets_paths) > 0, OSError("Your iGenomes directory is corrupted, more than one directory matched by {}."
"\nMatched dirs: {}".format(os.path.join(*path_components),
", ".join(assets_paths)))
assets_path = assets_paths[0]
asset_names = [d for d in os.listdir(assets_path) if os.path.isdir(assets_path)]
processed = []