Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert app_files[0] == s3_table_active.dir_apps_delta_load + filename_json
delta_load_config_s3 = app_files[0]
delta_load_config_content = self.get_object_content_from_s3(delta_load_config_s3)
load_table_parameters = json.loads(delta_load_config_content)
assert load_table_parameters["active_records_table_lake"] == s3_table_active.db_table_lake
assert load_table_parameters["active_records_dir_lake"] == s3_table_active.dir_lake_final
assert load_table_parameters["delta_records_file_path"] == s3_table_active.dir_landing_data
assert load_table_parameters["technical_key"] == ["m3d_timestamp", "datapakid", "partno", "record"]
assert load_table_parameters["business_key"] == s3_table_active.business_key
if s3_table_active.partitioned_by in Util.defined_partitions:
target_partitions = Util.get_target_partitions_list(s3_table_active.partitioned_by)
else:
target_partitions = s3_table_active.partitioned_by
assert load_table_parameters["target_partitions"] == target_partitions
assert load_table_parameters["partition_column"] == s3_table_active.partition_column
assert load_table_parameters["partition_column_format"] == s3_table_active.partition_column_format
# Check EMR steps.
fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
expected_algorithms_jar_path = "s3://" + bucket_application + os.path.join(
scon_emr_dict["environments"][destination_environment]["s3_deployment_dir_base"],
destination_environment,
scon_emr_dict["subdir"]["m3d"],
m3d_config_dict["subdir_projects"]["m3d_api"],
def test_get_target_partitions_list(self):
"""
This method tests the correct functionality of get_partition_column_list of Util class
:return:
"""
assert Util.get_target_partitions_list("year") == ["year"]
assert Util.get_target_partitions_list("month") == ["year", "month"]
assert Util.get_target_partitions_list("day") == ["year", "month", "day"]
assert Util.get_target_partitions_list("") == []
with pytest.raises(Exception) as exc_info:
Util.get_target_partitions_list("country")
assert "Partition type country not supported" in str(exc_info.value)
assert app_files[0] == s3_table_active.dir_apps_delta_load + filename_json
delta_load_config_s3 = app_files[0]
delta_load_config_content = self.get_object_content_from_s3(delta_load_config_s3)
load_table_parameters = json.loads(delta_load_config_content)
assert load_table_parameters["active_records_table_lake"] == s3_table_active.db_table_lake
assert load_table_parameters["active_records_dir_lake"] == s3_table_active.dir_lake_final
assert load_table_parameters["delta_records_file_path"] == s3_table_active.dir_landing_data
assert load_table_parameters["technical_key"] == ["m3d_timestamp", "datapakid", "partno", "record"]
assert load_table_parameters["business_key"] == s3_table_active.business_key
if s3_table_active.partitioned_by in Util.defined_partitions:
target_partitions = Util.get_target_partitions_list(s3_table_active.partitioned_by)
else:
target_partitions = s3_table_active.partitioned_by
assert load_table_parameters["target_partitions"] == target_partitions
assert load_table_parameters["partition_column"] == s3_table_active.partition_column
assert load_table_parameters["partition_column_format"] == s3_table_active.partition_column_format
# Check EMR steps.
fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
expected_algorithms_jar_path = "s3://" + bucket_application + os.path.join(
scon_emr_dict["environments"][destination_environment]["s3_deployment_dir_base"],
destination_environment,
scon_emr_dict["subdir"]["m3d"],
def test_get_target_partitions_list(self):
"""
This method tests the correct functionality of get_partition_column_list of Util class
:return:
"""
assert Util.get_target_partitions_list("year") == ["year"]
assert Util.get_target_partitions_list("month") == ["year", "month"]
assert Util.get_target_partitions_list("day") == ["year", "month", "day"]
assert Util.get_target_partitions_list("") == []
with pytest.raises(Exception) as exc_info:
Util.get_target_partitions_list("country")
assert "Partition type country not supported" in str(exc_info.value)
def test_get_defined_partition_columns_hive(self):
"""
This method tests the correct functionality of get_partition_column_string of Util class
:return:
"""
assert Util.get_defined_target_partitions_hive("year") == "year smallint"
assert Util.get_defined_target_partitions_hive("month") == "year smallint,month smallint"
assert Util.get_defined_target_partitions_hive("day") == "year smallint,month smallint,day smallint"
assert Util.get_defined_target_partitions_hive("") == ""
with pytest.raises(Exception) as exc_info:
Util.get_target_partitions_list("country")
assert "Partition type country not supported" in str(exc_info.value)
def build_params(self):
params = DeltaLoadParams(
self._dataset.db_table_lake,
self._dataset.dir_lake_final,
self._dataset.dir_landing_data,
["m3d_timestamp", "datapakid", "partno", "record"],
self._dataset.business_key,
util.Util.get_target_partitions_list(self._dataset.partitioned_by),
self._dataset.partition_column,
self._dataset.partition_column_format
)
return params
def get_projection_columns(self, src_column_names, destination_column_names):
columns = list(filter(lambda x: x[1], zip(src_column_names, destination_column_names)))
if self.partitioned_by in Util.defined_partitions:
target_partitions = list(map(lambda x: (x, x), Util.get_target_partitions_list(self.partitioned_by)))
return columns + target_partitions
else:
return columns
def get_defined_target_partitions_hive(partitioned_by):
"""
Return partition columns as comma separated String with Hive data types
:param partitioned_by: type of partition (year, month, day, or "")
:return: partition columns as comma separated String with Hive data types
"""
cols = Util.get_target_partitions_list(partitioned_by)
return functools.reduce(lambda x, y: x + "," + y, map(lambda x: x + " smallint", cols)) if cols else ""