Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
emr_system = EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment
)
test_s3_table = S3Table(emr_system, destination_table)
# Put landing data
self.dump_data_to_s3(
os.path.join(test_s3_table.dir_landing_final, landing_dataset),
"t|e|s|t|a|d|i|d|a|s|m|3|d|"
)
M3D.load_table(*table_config)
# Since we have offloaded data move operations to EMR Steps dir_landing_final will still have
# old files in it and dir_landing_archive will not have new files
landing_files = self.get_child_objects(test_s3_table.dir_landing_final)
assert len(landing_files) == 1
assert landing_files[0] == os.path.join(test_s3_table.dir_landing_final, landing_dataset)
landing_archive_files = self.get_child_objects(test_s3_table.dir_landing_archive)
assert len(landing_archive_files) == 0
# Check EMR steps.
fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
expected_algorithms_jar_path = "s3://" + bucket_application + os.path.join(
emr_system = EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment
)
test_s3_table = S3Table(emr_system, destination_table)
# Put landing data
self.dump_data_to_s3(
os.path.join(test_s3_table.dir_landing_final, landing_dataset),
"t|e|s|t|a|d|i|d|a|s|m|3|d|"
)
M3D.load_table(*table_config, spark_params=json.dumps(spark_external_parameters))
# psv file will still be in landing since move operation should be
# performed by EMR Step which we mock here. Accordingly archive will
# still be empty.
landing_files = self.get_child_objects(test_s3_table.dir_landing_final)
assert len(landing_files) == 1
assert landing_files[0] == os.path.join(test_s3_table.dir_landing_final, landing_dataset)
landing_archive_files = self.get_child_objects(test_s3_table.dir_landing_archive)
assert len(landing_archive_files) == 0
# Check EMR steps.
fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
def load_table(self, emr_cluster_id, spark_parameters=None):
if spark_parameters is None:
M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id]))
else:
M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id, spark_parameters]))
destination_database,
destination_environment
)
s3_table_active = S3Table(emr_system, destination_active_table)
s3_table_changelog = S3Table(emr_system, destination_changelog_table)
# Extract bucket names
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
# Put lake data for changelog table, this should be archived
self.dump_data_to_s3(
os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
"t|e|s|t|a|d|i|d|a|s|m|3|d|",
)
M3D.load_table(
m3d_config_file,
destination_system,
destination_database,
destination_environment,
destination_active_table,
load_type,
self.emr_cluster_id,
spark_params=spark_external_parameters
)
filename_json = "delta_load-{environment}-{table}.json".format(
environment=destination_environment,
table=destination_active_table
)
# Checking configuration file for m3d-engine
destination_database,
destination_environment
)
s3_table_active = S3Table(emr_system, destination_active_table)
s3_table_changelog = S3Table(emr_system, destination_changelog_table)
# Extract bucket names
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
# Put lake data for changelog table, this should be archived
self.dump_data_to_s3(
os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
"t|e|s|t|a|d|i|d|a|s|m|3|d|",
)
M3D.load_table(
m3d_config_file,
destination_system,
destination_database,
destination_environment,
destination_active_table,
load_type,
self.emr_cluster_id,
spark_params=json.dumps(spark_external_parameters)
)
filename_json = "delta_load-{environment}-{table}.json".format(
environment=destination_environment,
table=destination_active_table
)
# Checking configuration file for m3d-engine
dataset = DataSetFactory.create_dataset(
execution_system,
load_type,
data_type,
destination_table
)
self._load_wrapper = available_loads[load_type](
execution_system=self._execution_system,
dataset=dataset,
load_params=load_params
)
self._execution_system.add_cluster_tags({
EMRSystem.EMRClusterTag.API_METHOD: M3D.load_table.__name__,
EMRSystem.EMRClusterTag.LOAD_TYPE: load_type,
EMRSystem.EMRClusterTag.TARGET_TABLE: destination_table
})