Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for key, value in obj.items():
if key == "user_data": # base64 encode user data
with open(os.path.join(os.getcwd(), obj[key]), "rb") as f:
translated_obj[key] = b64encode(f.read())
elif key == "__type__": # do legacy object initialisation
return Translator().translate_hierarchy(obj)
else:
translated_obj[key] = translate_config(value)
return translated_obj
elif isinstance(obj, list):
return [translate_config(item) for item in obj]
else:
return obj
@plugin_constraints(before={"pipeline"})
class Configuration(Borg):
_shared_state = AttributeDict()
def __init__(self, configuration: [str, dict] = None):
super(Configuration, self).__init__()
if configuration:
if isinstance(configuration, str): # interpret string as file name
self.load_config(configuration)
else:
self.update_config(configuration)
def load_config(self, config_file: str) -> None:
"""
Loads YAML configuration file into shared state of the configuration borg
:param config_file: The name of the configuration file to be loaded
:type config_file: str