Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Initialize representation of a semi-structured dataset on S3 without Hive metadata
:param emr_system: system config file
:param dataset_name: name of the dataset to load.
"""
# call super constructor
super(SemistructuredDataSet, self).__init__(emr_system)
# derived directories
self.source_system = dataset_name.split("_", 1)[0]
self.dataset_subdir = dataset_name.split("_", 1)[-1]
self.table_lake = self.dataset_subdir
self.db_table_lake = self.dataset_subdir
self.dir_landing_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_landing,
self.emr_system.environment,
self.source_system
)
self.dir_landing_table = os.path.join(self.dir_landing_source_system, self.dataset_subdir)
self.dir_landing_data = os.path.join(self.dir_landing_table, self.emr_system.subdir_data)
self.dir_landing_work = os.path.join(self.dir_landing_table, self.emr_system.subdir_work)
self.dir_landing_archive = os.path.join(self.dir_landing_table, self.emr_system.subdir_archive)
self.dir_landing_header = os.path.join(self.dir_landing_table, self.emr_system.subdir_header)
self.dir_landing_final = self.dir_landing_data # used for compatibility with HDFSTable
self.dir_lake_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_lake,
self.emr_system.environment,
self.source_system
)
def __init__(self, config, source_system, database):
# store parameters
self.source_system = source_system
self.database = database
self.config = config
# create config service
self.config_service = config_service.ConfigService(config)
self.table_lake = self.dataset_subdir
self.db_table_lake = self.dataset_subdir
self.dir_landing_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_landing,
self.emr_system.environment,
self.source_system
)
self.dir_landing_table = os.path.join(self.dir_landing_source_system, self.dataset_subdir)
self.dir_landing_data = os.path.join(self.dir_landing_table, self.emr_system.subdir_data)
self.dir_landing_work = os.path.join(self.dir_landing_table, self.emr_system.subdir_work)
self.dir_landing_archive = os.path.join(self.dir_landing_table, self.emr_system.subdir_archive)
self.dir_landing_header = os.path.join(self.dir_landing_table, self.emr_system.subdir_header)
self.dir_landing_final = self.dir_landing_data # used for compatibility with HDFSTable
self.dir_lake_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_lake,
self.emr_system.environment,
self.source_system
)
self.dir_lake_table = os.path.join(self.dir_lake_source_system, self.dataset_subdir)
self.dir_lake_final = os.path.join(self.dir_lake_table, self.emr_system.subdir_data)
self.dir_lake_backup = os.path.join(self.dir_lake_table, self.emr_system.subdir_data_backup)
# apps
self.dir_apps_system = os.path.join(self.emr_system.dir_apps_loading, self.source_system)
self.dir_apps_table = os.path.join(self.dir_apps_system, self.dataset_subdir)
self.dir_apps_full_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_full_load)
self.dir_apps_delta_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_delta_load)
self.dir_apps_append_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_append_load)
def _create_load_config_filename(self):
return "{load_tag}-{environment}-{table}{extension}".format(
load_tag=self._get_load_load_tag(),
environment=self._execution_system.environment,
table=self._dataset.table_lake,
extension=ConfigService.Extensions.JSON
)
def create(
config_path,
destination_database,
destination_environment,
algorithm_instance,
ext_params_str=None
):
"""
Create algorithm configuration object from acon file. Method will discover acon file based on the
parameters passed to it.
:return: Returns algorithm configuration object of the type that is used for calling the method.
"""
# Create config service to get acon file path.
config_service = ConfigService(config_path)
acon_path = config_service.get_acon_path(
destination_database,
destination_environment,
algorithm_instance
)
acon_dict = Util.load_dict(acon_path)
if ext_params_str:
ext_params_dict = json.loads(ext_params_str)
acon_dict = Util.merge_nested_dicts(acon_dict, ext_params_dict)
return AlgorithmConfigurationHadoop(algorithm_instance, acon_dict)
def execute_hive(self, hql, return_output=False):
# Put HQL statement to a file since it can be longer than allowed length of EMR step parameter.
datetime_str = Util.get_formatted_utc_now(EMRSystem.DATETIME_FORMAT)
id_str = EMRSystem._generate_random_id()
hql_filename = "{}.{}{}".format(datetime_str, id_str, ConfigService.Extensions.HQL)
hql_path_local = os.path.join(self.dir_tmp_local, hql_filename)
hql_path_s3 = os.path.join(self.dir_tmp_s3, hql_filename)
with open(hql_path_local, "w") as hql_file:
hql_file.write(hql)
self.s3_util.upload_object(hql_path_local, hql_path_s3)
# Create hive command line.
hive_cmd = "hive --silent -f {}".format(hql_path_s3)
# Add step to EMR cluster.
step_name = "Hive EMR Step: datetime=\"{}\", id=\"{}\"".format(datetime_str, id_str)
emr_step_id = self.emr_cluster_client.add_step(step_name, hive_cmd)
self.emr_cluster_client.wait_for_step_completion(emr_step_id)
def read_acon_params(execution_system, table_name):
config_service = ConfigService(execution_system.config)
acon_path = config_service.get_acon_path(
execution_system.database,
execution_system.environment,
table_name)
acon_dict = Util.load_dict(acon_path)
return acon_dict.get(LoadHadoop.PARAMETERS_KEY, {})
# derived directories
self.dir_landing_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_landing,
self.emr_system.environment,
self.source_system
)
self.dir_landing_table = os.path.join(self.dir_landing_source_system, self.table)
self.dir_landing_data = os.path.join(self.dir_landing_table, self.emr_system.subdir_data)
self.dir_landing_work = os.path.join(self.dir_landing_table, self.emr_system.subdir_work)
self.dir_landing_archive = os.path.join(self.dir_landing_table, self.emr_system.subdir_archive)
self.dir_landing_header = os.path.join(self.dir_landing_table, self.emr_system.subdir_header)
self.dir_landing_final = self.dir_landing_data # used for compatibility with HDFSTable
self.dir_lake_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_lake,
self.emr_system.environment,
self.source_system
)
self.dir_lake_table = os.path.join(self.dir_lake_source_system, self.table)
self.dir_lake_final = os.path.join(self.dir_lake_table, self.emr_system.subdir_data)
self.dir_lake_backup = os.path.join(self.dir_lake_table, self.emr_system.subdir_data_backup)
# apps
self.dir_apps_system = os.path.join(self.emr_system.dir_apps_loading, self.source_system)
self.dir_apps_table = os.path.join(self.dir_apps_system, self.table)
self.dir_apps_full_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_full_load)
self.dir_apps_delta_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_delta_load)
self.dir_apps_append_load = os.path.join(self.dir_apps_table, self.emr_system.subdir_append_load)
emr_system.source_system,
emr_system.database,
emr_system.environment,
destination_table,
**kwargs
)
self.emr_system = emr_system
# parse Spark parameters
self.spark_params = SparkParameters(self.emr_system.spark_params)
self.spark_params.merge_spark_params_str(spark_params)
# derived directories
self.dir_landing_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_landing,
self.emr_system.environment,
self.source_system
)
self.dir_landing_table = os.path.join(self.dir_landing_source_system, self.table)
self.dir_landing_data = os.path.join(self.dir_landing_table, self.emr_system.subdir_data)
self.dir_landing_work = os.path.join(self.dir_landing_table, self.emr_system.subdir_work)
self.dir_landing_archive = os.path.join(self.dir_landing_table, self.emr_system.subdir_archive)
self.dir_landing_header = os.path.join(self.dir_landing_table, self.emr_system.subdir_header)
self.dir_landing_final = self.dir_landing_data # used for compatibility with HDFSTable
self.dir_lake_source_system = os.path.join(
ConfigService.Protocols.S3 + self.emr_system.bucket_lake,
self.emr_system.environment,
self.source_system
)
"""
Return ddic path for upload system export
:param source_system source system code
:param src_database source database code
:param source_schema upload schema code
:param source_table: upload table code
:return: ddic file for upload system export
"""
filename = "-".join([
ConfigService.Prefixes.DDIC,
source_system,
src_database,
source_schema,
source_table
]) + ConfigService.Extensions.CSV
base_path = os.path.join(self.tag_config, self.tag_table, self.tag_upload, source_system, filename)
return base_path