Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with pytest.raises(ConfigTypeError):
cfg.user.pop("name")
with pytest.raises(ConfigTypeError):
cfg.user.pop("bar")
with pytest.raises(ConfigTypeError):
cfg.user.pop("bar", "not even with default")
# Unlocking the top level node is not enough.
with pytest.raises(ConfigTypeError):
with open_dict(cfg):
cfg.user.pop("name")
# You need to unlock the specified structured node to pop a field from it.
with open_dict(cfg.user):
cfg.user.pop("name")
assert "name" not in cfg.user
def test_struct_override(src: Any, func: Any, expectation: Any) -> None:
c = OmegaConf.create(src)
OmegaConf.set_struct(c, True)
with expectation:
func(c)
with does_not_raise():
with open_dict(c):
func(c)
def test_dict_structured_delitem() -> None:
c = OmegaConf.structured(User(name="Bond"))
with pytest.raises(ConfigTypeError):
del c["name"]
with open_dict(c):
del c["name"]
assert "name" not in c
def load_sweep_config(self, master_config, sweep_overrides):
# Recreate the config for this sweep instance with the appropriate overrides
overrides = (
OmegaConf.to_container(master_config.hydra.overrides.hydra)
+ sweep_overrides
)
sweep_config = self.load_configuration(
config_file=master_config.hydra.job.config_file,
strict=self.default_strict,
overrides=overrides,
)
with open_dict(sweep_config):
sweep_config.hydra.runtime.merge_with(master_config.hydra.runtime)
# Copy old config cache to ensure we get the same resolved values (for things like timestamps etc)
OmegaConf.copy_cache(from_config=master_config, to_config=sweep_config)
return sweep_config
def compose_config(
self, config_file, overrides, strict=None, with_log_configuration=False
):
"""
:param self:
:param config_file:
:param overrides:
:param with_log_configuration: True to configure logging subsystem from the loaded config
:param strict: None for default behavior (default to true for config file, false if no config file).
otherwise forces specific behavior.
:return:
"""
cfg = self.config_loader.load_configuration(
config_file=config_file, overrides=overrides, strict=strict
)
with open_dict(cfg):
from .. import __version__
cfg.hydra.runtime.version = __version__
cfg.hydra.runtime.cwd = os.getcwd()
if with_log_configuration:
configure_log(cfg.hydra.hydra_logging, cfg.hydra.verbose)
global log
log = logging.getLogger(__name__)
self._print_debug_info()
return cfg
def launch(self, job_overrides):
# lazy import to ensurue plugin discovery remains fast
import submitit
num_jobs = len(job_overrides)
assert num_jobs > 0
with open_dict(self.config):
self.config.hydra.job.num_jobs = num_jobs
if self.queue == "auto":
executor = submitit.AutoExecutor(
folder=self.folder, conda_file=self.conda_file
)
elif self.queue == "slurm":
executor = submitit.SlurmExecutor(folder=self.folder)
elif self.queue == "chronos":
executor = submitit.ChronosExecutor(
folder=self.folder, conda_file=self.conda_file
)
elif self.queue == "local":
executor = submitit.LocalExecutor(folder=self.folder)
else:
raise RuntimeError("Unsupported queue type {}".format(self.queue))
def launch(self, job_overrides):
setup_globals()
configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose)
sweep_dir = self.config.hydra.sweep.dir
Path(str(sweep_dir)).mkdir(parents=True, exist_ok=True)
log.info("Launching {} jobs locally".format(len(job_overrides)))
runs = []
for idx, overrides in enumerate(job_overrides):
log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides))))
sweep_config = self.config_loader.load_sweep_config(
self.config, list(overrides)
)
with open_dict(sweep_config):
sweep_config.hydra.job.id = idx
sweep_config.hydra.job.num = idx
HydraConfig().set_config(sweep_config)
ret = run_job(
config=sweep_config,
task_function=self.task_function,
job_dir_key="hydra.sweep.dir",
job_subdir_key="hydra.sweep.subdir",
)
runs.append(ret)
configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose)
return runs
dest.__setitem__(key, src_value)
else:
if isinstance(src_value, BaseContainer):
dest.__setitem__(key, src_value)
else:
assert isinstance(dest_node, ValueNode)
try:
dest_node._set_value(src_value)
except (ValidationError, ReadonlyConfigError) as e:
dest._format_and_raise(key=key, value=src_value, cause=e)
else:
from omegaconf import open_dict
if is_structured_config(src_type):
# verified to be compatible above in _validate_set_merge_impl
with open_dict(dest):
dest[key] = src._get_node(key)
else:
dest[key] = src._get_node(key)
if src_type is not None and not is_primitive_dict(src_type):
dest._metadata.object_type = src_type
# explicit flags on the source config are replacing the flag values in the destination
for flag, value in src._metadata.flags.items():
if value is not None:
dest._set_flag(flag, value)