Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_algorithm_configuration(acon_dict):
return AlgorithmConfigurationHadoop("partition_materialization", acon_dict)
def _create_algorithm_configuration(acon_dict):
return AlgorithmConfigurationHadoop("partition_materialization", acon_dict)
}
}
algorithms = {
"python_class": test_python_class,
"parameters": {}
}
spark_params = {
"spark.executor.instances": "5",
"spark.executor.memory": "25G"
}
with patch('m3d.util.util.Util.load_dict', return_value={}):
with patch('m3d.util.util.Util.merge_nested_dicts', return_value=test_dict):
algorithm_configuration = AlgorithmConfigurationHadoop.create(
config,
"bdp_test",
"test",
"gzip_decompressor_bytes",
"""
{
"environment": {
"emr_cluster_id": "test_id"
}
}
"""
)
assert algorithm_configuration.get_python_class() == test_python_class
assert algorithm_configuration.get_spark_params() == spark_params
assert algorithm_configuration.get_algorithm_params() == algorithms
"spark.executor.instances": "5",
"spark.executor.memory": "25G"
}
},
"algorithm": {
"python_class": test_python_class,
"parameters": {}
}
}
expected_algorithm_section = {
"python_class": "test_python_class",
"parameters": {}
}
algorithm_configuration = AlgorithmConfigurationHadoop(test_algorithm_instance, acon_dict)
# Check python class
assert test_python_class == algorithm_configuration.get_python_class()
assert test_algorithm_instance == algorithm_configuration.get_algorithm_instance()
assert expected_algorithm_section == algorithm_configuration.get_algorithm_params()
"""
# Create config service to get acon file path.
config_service = ConfigService(config_path)
acon_path = config_service.get_acon_path(
destination_database,
destination_environment,
algorithm_instance
)
acon_dict = Util.load_dict(acon_path)
if ext_params_str:
ext_params_dict = json.loads(ext_params_str)
acon_dict = Util.merge_nested_dicts(acon_dict, ext_params_dict)
return AlgorithmConfigurationHadoop(algorithm_instance, acon_dict)
config_path,
destination_system,
destination_database,
destination_environment,
algorithm_instance,
emr_cluster_id,
ext_params_str
):
data_system = DataSystem(
config_path,
destination_system,
destination_database,
destination_environment
)
if data_system.database_type == DataSystem.DatabaseType.EMR:
config = AlgorithmConfigurationHadoop.create(
config_path,
destination_database,
destination_environment,
algorithm_instance,
ext_params_str
)
execution_system = EMRSystem.from_data_system(data_system, emr_cluster_id)
return AlgorithmExecutorHadoop(execution_system, config)
else:
raise M3DUnsupportedDatabaseTypeException(data_system.database_type)