Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert len(app_files) == 1
assert app_files[0] == s3_table_active.dir_apps_delta_load + filename_json
delta_load_config_s3 = app_files[0]
delta_load_config_content = self.get_object_content_from_s3(delta_load_config_s3)
load_table_parameters = json.loads(delta_load_config_content)
assert load_table_parameters["active_records_table_lake"] == s3_table_active.db_table_lake
assert load_table_parameters["active_records_dir_lake"] == s3_table_active.dir_lake_final
assert load_table_parameters["delta_records_file_path"] == s3_table_active.dir_landing_data
assert load_table_parameters["technical_key"] == ["m3d_timestamp", "datapakid", "partno", "record"]
assert load_table_parameters["business_key"] == s3_table_active.business_key
if s3_table_active.partitioned_by in Util.defined_partitions:
target_partitions = Util.get_target_partitions_list(s3_table_active.partitioned_by)
else:
target_partitions = s3_table_active.partitioned_by
assert load_table_parameters["target_partitions"] == target_partitions
assert load_table_parameters["partition_column"] == s3_table_active.partition_column
assert load_table_parameters["partition_column_format"] == s3_table_active.partition_column_format
# Check EMR steps.
fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
expected_algorithms_jar_path = "s3://" + bucket_application + os.path.join(
scon_emr_dict["environments"][destination_environment]["s3_deployment_dir_base"],
destination_environment,
def get_projection_columns(self, src_column_names, destination_column_names):
columns = list(filter(lambda x: x[1], zip(src_column_names, destination_column_names)))
if self.partitioned_by in Util.defined_partitions:
target_partitions = list(map(lambda x: (x, x), Util.get_target_partitions_list(self.partitioned_by)))
return columns + target_partitions
else:
return columns
:return: Returns algorithm configuration object of the type that is used for calling the method.
"""
# Create config service to get acon file path.
config_service = ConfigService(config_path)
acon_path = config_service.get_acon_path(
destination_database,
destination_environment,
algorithm_instance
)
acon_dict = Util.load_dict(acon_path)
if ext_params_str:
ext_params_dict = json.loads(ext_params_str)
acon_dict = Util.merge_nested_dicts(acon_dict, ext_params_dict)
return AlgorithmConfigurationHadoop(algorithm_instance, acon_dict)
def _get_create_lake_statement(self, table_location):
def create_statement(_columns, _target_partitions=None):
return HQLGenerator.CreateParquetTableStatementBuilder(self.db_table_lake, table_location, _columns) \
.partitioned_by(_target_partitions) \
.with_properties({"serialization.encoding": "UTF-8"}) \
.build(is_external=True)
if self.partitioned_by in Util.defined_partitions:
return create_statement(self.columns_lake, Util.get_typed_target_partitions_hive(self.partitioned_by))
elif len(self.partitioned_by) > 0:
matched_columns = list(filter(lambda x: x[0] == self.partitioned_by, self.columns_lake))
if len(matched_columns) > 0:
# when table is partitioned by one of its columns
# then partition column should to excluded from list of regular columns
columns = filter(lambda x: x[0] != self.partitioned_by, self.columns_lake)
target_partitions = [(matched_columns[0][0], matched_columns[0][1])]
return create_statement(columns, target_partitions)
else:
raise Exception("Partitioned field doesn't match any column".format(self.partitioned_by))
else:
return create_statement(self.columns_lake)
def merge_nested_dicts(dict1, dict2):
"""
Merge two dictionaries overwriting values from dict1 with ones contained in dict2.
Merge will take nesting into consideration.
:param dict1: dictionary with default values
:param dict2: dictionary containing overwrite values
:return: dictionary containing combined parameters
"""
combined = dict1.copy()
for key, value in dict2.items():
if isinstance(value, collections.Mapping):
combined[key] = Util.merge_nested_dicts(combined.get(key, {}), value)
else:
combined[key] = value
return combined
def get_target_partitions_list(partitioned_by):
"""
Return list of partition columns based on partitioned_by
:param partitioned_by: type of partition
:return: list of partition columns
:raises Exception: if partition type is not supported
"""
if not partitioned_by:
return []
elif partitioned_by in Util.defined_partitions:
return Util.defined_partitions[partitioned_by]
else:
raise Exception("Partition type " + partitioned_by + " not supported")
):
"""
Create algorithm configuration object from acon file. Method will discover acon file based on the
parameters passed to it.
:return: Returns algorithm configuration object of the type that is used for calling the method.
"""
# Create config service to get acon file path.
config_service = ConfigService(config_path)
acon_path = config_service.get_acon_path(
destination_database,
destination_environment,
algorithm_instance
)
acon_dict = Util.load_dict(acon_path)
if ext_params_str:
ext_params_dict = json.loads(ext_params_str)
acon_dict = Util.merge_nested_dicts(acon_dict, ext_params_dict)
return AlgorithmConfigurationHadoop(algorithm_instance, acon_dict)
def read_acon_params(execution_system, table_name):
config_service = ConfigService(execution_system.config)
acon_path = config_service.get_acon_path(
execution_system.database,
execution_system.environment,
table_name)
acon_dict = Util.load_dict(acon_path)
return acon_dict.get(LoadHadoop.PARAMETERS_KEY, {})
def get_target_partitions(self):
if not self.partitioned_by or self.partitioned_by in util.Util.defined_partitions:
return util.Util.get_target_partitions_list(self.partitioned_by)
else:
return self.partitioned_by