Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
destination_environment,
destination_table
]
table_config_kwargs = {
"emr_cluster_id": self.emr_cluster_id
}
emr_steps_completer = self.create_emr_steps_completer(expected_steps_count=1, timeout_seconds=3)
with ConcurrentExecutor(emr_steps_completer, delay_sec=0.4):
logging.info("Calling M3D.create_lake_out_view().")
M3D.create_lake_out_view(*table_config, **table_config_kwargs)
emr_system = EMRSystem(*table_config[:5])
s3_table = S3Table(emr_system, destination_table)
mock_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(mock_cluster.steps)
hive_step = mock_cluster.steps[0]
assert hive_step.args[0] == "hive"
assert hive_step.args[1] == "--silent"
assert hive_step.args[2] == "-f"
actual_hql_content_in_bucket = self.get_object_content_from_s3(hive_step.args[3])
column_name_pairs = [
("record_date", "v_record_date"),
("p_string", "v_string"),
("p_int", "v_int"),
destination_environment,
destination_table,
load_type,
self.emr_cluster_id
]
# Extract bucket names
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
emr_system = EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment
)
test_s3_table = S3Table(emr_system, destination_table)
# Put landing data
self.dump_data_to_s3(
os.path.join(test_s3_table.dir_landing_final, landing_dataset),
"t|e|s|t|a|d|i|d|a|s|m|3|d|"
)
M3D.load_table(*table_config, spark_params=json.dumps(spark_external_parameters))
# psv file will still be in landing since move operation should be
# performed by EMR Step which we mock here. Accordingly archive will
# still be empty.
landing_files = self.get_child_objects(test_s3_table.dir_landing_final)
assert len(landing_files) == 1
assert landing_files[0] == os.path.join(test_s3_table.dir_landing_final, landing_dataset)
destination_system,
destination_database,
destination_environment,
destination_active_table,
src_tconx_content,
src_tconx_cl_content
)
emr_system = EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment
)
s3_table_active = S3Table(emr_system, destination_active_table)
s3_table_changelog = S3Table(emr_system, destination_changelog_table)
# Extract bucket names
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
# Put lake data for changelog table, this should be archived
self.dump_data_to_s3(
os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
"t|e|s|t|a|d|i|d|a|s|m|3|d|",
)
M3D.load_table(
m3d_config_file,
destination_system,
destination_database,
destination_environment,
destination_active_table,
self.local_run_dir,
destination_system,
destination_database,
destination_environment,
destination_active_table,
src_tconx_content,
src_tconx_cl_content
)
emr_system = EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment
)
s3_table_active = S3Table(emr_system, destination_active_table)
s3_table_changelog = S3Table(emr_system, destination_changelog_table)
# Extract bucket names
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
# Put lake data for changelog table, this should be archived
self.dump_data_to_s3(
os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
"t|e|s|t|a|d|i|d|a|s|m|3|d|",
)
M3D.load_table(
m3d_config_file,
destination_system,
destination_database,
destination_environment,
def drop_lake_out_view(self, destination_table):
from m3d.hadoop.emr.s3_table import S3Table
full_table_name = "{}.{}".format(self.db_lake_out, destination_table)
self.add_cluster_tag(self.EMRClusterTag.TARGET_VIEW, full_table_name)
S3Table(self, destination_table).drop_lake_out_view()
def __init__(self, emr_system, destination_table, spark_params=None, **kwargs):
"""
Initialize representation of Hive table on S3
:param config: system config file
:param destination_system: destination system code
:param destination_database: destination database code
:param destination_environment: destination environment code
:param destination_table: destination table code
:param emr_cluster_id: emr cluster id
:param spark_params: external spark parameters to override scon defaults
"""
# call super constructor
super(S3Table, self).__init__(
emr_system.config,
emr_system.source_system,
emr_system.database,
emr_system.environment,
destination_table,
**kwargs
)
self.emr_system = emr_system
# parse Spark parameters
self.spark_params = SparkParameters(self.emr_system.spark_params)
self.spark_params.merge_spark_params_str(spark_params)
# derived directories
self.dir_landing_source_system = os.path.join(
def create_lake_out_view(self, destination_table):
from m3d.hadoop.emr.s3_table import S3Table
full_table_name = "{}.{}".format(self.db_lake_out, destination_table)
self.add_cluster_tag(self.EMRClusterTag.TARGET_VIEW, full_table_name)
S3Table(self, destination_table).create_lake_out_view()
def create_table(self, destination_table):
from m3d.hadoop.emr.s3_table import S3Table
full_table_name = "{}.{}".format(self.db_lake, destination_table)
self.add_cluster_tag(self.EMRClusterTag.TARGET_TABLE, full_table_name)
S3Table(self, destination_table).create_tables()
def truncate_table(self, destination_table):
from m3d.hadoop.emr.s3_table import S3Table
full_table_name = "{}.{}".format(self.db_lake, destination_table)
self.add_cluster_tag(self.EMRClusterTag.TARGET_TABLE, full_table_name)
S3Table(self, destination_table).truncate_tables()
def drop_table(self, destination_table):
from m3d.hadoop.emr.s3_table import S3Table
full_table_name = "{}.{}".format(self.db_lake, destination_table)
self.add_cluster_tag(self.EMRClusterTag.TARGET_TABLE, full_table_name)
S3Table(self, destination_table).drop_tables()