Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_emr_system(self):
destination_system = "bdp"
destination_database = "emr_test"
destination_environment = "prod"
m3d_config_file, _, _, _ = self.env_setup(
self.local_run_dir,
destination_system,
destination_database,
destination_environment
)
return EMRSystem(
m3d_config_file,
destination_system,
destination_database,
destination_environment,
self.emr_cluster_id
)
destination_database,
destination_environment,
destination_table
]
table_config_kwargs = {
"emr_cluster_id": self.emr_cluster_id
}
emr_steps_completer = self.create_emr_steps_completer(expected_steps_count=1, timeout_seconds=3)
with ConcurrentExecutor(emr_steps_completer, delay_sec=0.4):
logging.info("Calling M3D.create_lake_out_view().")
M3D.create_lake_out_view(*table_config, **table_config_kwargs)
emr_system = EMRSystem(*table_config[:5])
s3_table = S3Table(emr_system, destination_table)
mock_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(mock_cluster.steps)
hive_step = mock_cluster.steps[0]
assert hive_step.args[0] == "hive"
assert hive_step.args[1] == "--silent"
assert hive_step.args[2] == "-f"
actual_hql_content_in_bucket = self.get_object_content_from_s3(hive_step.args[3])
column_name_pairs = [
("record_date", "v_record_date"),
("p_string", "v_string"),
@patch.object(EMRSystem, 'config_service', new=MockConfigService, spec=None, create=True)
def test_parses_basic_attributes_from_system_config_file(self, _, __):
"""
Test case checks that all relevant key-values are extracted from sconx file and assigned to correct member
variables of EMRSystem object.
"""
aws_api_credentials = AWSCredentials("fake_aws_api_access_key", "fake_aws_api_secret_key")
aws_api_credentials_file = self.local_run_dir.join("aws-credentials-emr-api.json")
self.dump_aws_credentials(aws_api_credentials, str(aws_api_credentials_file))
aws_s3_put_credentials = AWSCredentials("fake_aws_s3_put_access_key", "fake_aws_s3_put_secret_key")
aws_s3_put_credentials_file = self.local_run_dir.join("aws-credentials-emr-s3_put.json")
self.dump_aws_credentials(aws_s3_put_credentials, str(aws_s3_put_credentials_file))
aws_s3_del_credentials = AWSCredentials("fake_aws_s3_del_access_key", "fake_aws_s3_del_secret_key")
aws_s3_del_credentials_file = self.local_run_dir.join("aws-credentials-emr-s3_del.json")
self.dump_aws_credentials(aws_s3_del_credentials, str(aws_s3_del_credentials_file))
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""
super(AlgorithmFixedLengthStringExtractor,
self).__init__(execution_system, algorithm_instance, algorithm_params)
self.validate_parameters()
self.source_table = self._execution_system.db_lake + "." + self._parameters["source_table"]
self.target_table = self._execution_system.db_lake + "." + self._parameters["target_table"]
self.metadata_update_strategy = self._parameters.get("metadata_update_strategy", None)
execution_system.add_cluster_tags({
EMRSystem.EMRClusterTag.SOURCE_TABLE: self.source_table,
EMRSystem.EMRClusterTag.TARGET_TABLE: self.target_table
})
# make sure not the repeat the full path again on the acon file if you have the following concatenation logic
# self.source_location = os.path.join("s3://",
# self._execution_system.bucket_lake, self._parameters["source_location"])
self.target_table = self._execution_system.db_lake + "." + self._parameters["target_table"]
self.output_dictionary = {
"source_table": self.source_table,
# you can use a source location as parquet files on the lake instead of a hive table
# "source_location": self.source_location,
"target_table": self.target_table,
"date_from": self._parameters["date_from"],
"date_to": self._parameters["date_to"]
}
execution_system.add_cluster_tags({
EMRSystem.EMRClusterTag.SOURCE_TABLE: self.source_table,
EMRSystem.EMRClusterTag.TARGET_TABLE: self.target_table
})
def create_spark_submit_str(self, spark_params, algorithm_class, file_json_s3):
"""
Return spark-submit string
:param spark_params: Spark parameters for spark-submit call
:param algorithm_class: Spark algorithm to be executed
:param file_json_s3: full path to JSON file with algorithm parameters
:return: spark2-submit String to be executed in shell
"""
EMRSystem.validate_spark_params(spark_params)
spark_submit = "spark-submit "
spark_submit += "--master yarn "
spark_submit += "--deploy-mode cluster "
for key, value in spark_params.items():
spark_submit += "--conf {}={} ".format(key, value)
spark_submit += "--class {} ".format(self.spark_main_class)
spark_submit += self.spark_jar_path + " "
spark_submit += algorithm_class + " "
spark_submit += file_json_s3 + " "
spark_submit += self.storage_type
return spark_submit
emr_cluster_id=None,
spark_params=None
):
"""
Initialize Hadoop system
:param config: system config file
:param source_system: destination system code
:param database: destination database code
:param environment: destination schema code
:param emr_cluster_id: id EMR cluster
:param spark_params: spark specific parameters
"""
# call super constructor
super(EMRSystem, self).__init__(config, source_system, database, environment)
self.scon_full_path = self.config_service.get_scon_path(source_system, database)
# system config files
with open(self.scon_full_path) as data_file:
params_system = json.load(data_file)
# S3 buckets
self.bucket_landing = params_system["environments"][self.environment]["s3_buckets"]["landing"]
self.bucket_lake = params_system["environments"][self.environment]["s3_buckets"]["lake"]
self.bucket_mart_cal = params_system["environments"][self.environment]["s3_buckets"]["mart_cal"]
self.bucket_application = params_system["environments"][self.environment]["s3_buckets"]["application"]
self.bucket_log = params_system["environments"][self.environment]["s3_buckets"]["log"]
# EMR default configuration
self.default_emr_version = params_system["emr"]["default_emr_version"]
def add_emr_cluster_tags(
config,
destination_system,
destination_database,
destination_environment,
emr_cluster_id,
cluster_tags
):
from m3d.hadoop.emr.emr_system import EMRSystem
emr_system = EMRSystem(
config,
destination_system,
destination_database,
destination_environment,
emr_cluster_id
)
emr_system.add_cluster_tags(cluster_tags)