Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_table_append(self, remove_json_patch, add_tags_patch, _0, _1):
target_partitions = ["year", "month", "day"]
regex_filename = ["[0-9]{4}", "(?<=[0-9]{4})([0-9]{2})(?=[0-9]{2})", "(?<=[0-9]{6})([0-9]{2})"]
spark_external_parameters = '''
{
"spark.driver.memory": "99G",
"spark.executor.instances": "99",
"spark.executor.memory": "90G"
}
'''
null_value = "test_null_value"
quote_character = "test_quote"
compute_table_statistics = True
verify_schema = False
data_type = DataType.STRUCTURED
reader_mode = "DROPMALFORMED"
metadata_update_strategy = "SparkRecoverPartitionsCustom"
source_system = AppendLoadConfig.destination_table.split("_", 1)[0]
table = AppendLoadConfig.destination_table.split("_", 1)[-1]
test_target_dir = "s3://{lake_bucket}/{destination_environment}/{system}/{table}/data/".format(
lake_bucket=self.default_dev_lake_bucket,
destination_environment=AppendLoadConfig.destination_environment,
system=source_system,
table=table
)
config = AppendLoadConfig(
self.local_run_dir,
self.env_setup,
target_partitions,
reader_mode=reader_mode,
metadata_update_strategy=metadata_update_strategy
)
self._write_tconx()
self.table_config = [self.config_file] + destination_params
emr_system = EMRSystem(
self.config_file,
self.destination_system,
self.destination_database,
self.destination_environment
)
# self.s3_table = S3Table(emr_system, self.destination_table)
if data_type is None:
data_type = DataType.STRUCTURED
self.dataset = DataSetFactory.create_dataset(
emr_system,
HiveTable.TableLoadType.APPEND,
data_type,
self.destination_table
)
config_filename = "append_load-{}-{}.json".format(self.destination_environment, self.dataset.table_lake)
self.config_filepath = os.path.join(self.dataset.dir_apps_append_load, config_filename)
self.db_name_lake = self.scon_emr_dict["environments"][self.destination_environment]["schemas"]["lake"]
self.expected_algorithms_jar_path = "s3://" + os.path.join(
(self.scon_emr_dict["environments"][self.destination_environment]["s3_buckets"]["application"]).strip("/"),
(self.scon_emr_dict["environments"][self.destination_environment]["s3_deployment_dir_base"]).strip("/"),
self.destination_environment,
app_json_files = list(filter(lambda app_file: os.path.basename(app_file).endswith(".json"), app_files))
assert len(app_json_files) == 1
assert app_json_files[0] == config.config_filepath
# Check config file for Spark
actual_parameters = json.loads(self.get_object_content_from_s3(config.config_filepath))
expected_parameters = {
"target_table": "test101",
"source_dir": config.dataset.dir_landing_final,
"header_dir": config.dataset.dir_landing_header,
"target_partitions": target_partitions,
"regex_filename": regex_filename,
"file_format": "dsv",
"null_value": "test_null_value",
"quote_character": "test_quote",
"data_type": DataType.SEMISTRUCTURED,
"verify_schema": True,
"target_dir": test_target_dir,
"schema": schema
}
assert actual_parameters == expected_parameters
def _get_supported_data_types():
"""
Return a list of the available data load
:return: list data-type-name
"""
return [
DataType.STRUCTURED,
DataType.SEMISTRUCTURED,
DataType.UNSTRUCTURED
]
def create_dataset(execution_system, load_type, data_type, dataset_name):
if data_type == DataType.STRUCTURED:
dataset = S3Table(
emr_system=execution_system,
destination_table=dataset_name
)
elif data_type == DataType.SEMISTRUCTURED:
if load_type == HiveTable.TableLoadType.APPEND:
dataset = SemistructuredDataSet(
emr_system=execution_system,
dataset_name=dataset_name
)
else:
raise M3DUnsupportedLoadTypeException(
load_type=load_type,
message="Loading algorithm {} not support for data type {}.".format(load_type, data_type)
)
else:
def _get_supported_data_types():
"""
Return a list of the available data load
:return: list data-type-name
"""
return [
DataType.STRUCTURED,
DataType.SEMISTRUCTURED,
DataType.UNSTRUCTURED
]
def _get_supported_data_types():
"""
Return a list of the available data load
:return: list data-type-name
"""
return [
DataType.STRUCTURED,
DataType.SEMISTRUCTURED,
DataType.UNSTRUCTURED
]
"Acon file could not be found for table: {} and execution system: {}".format(
destination_table,
execution_system.database_type
)
)
load_params = {}
if "data_type" in load_params:
available_data_types = self._get_supported_data_types()
data_type = load_params["data_type"]
if data_type not in available_data_types:
raise M3DUnsupportedDataTypeException(
message="Data Type {} not available.".format(data_type)
)
else:
data_type = DataType.STRUCTURED
dataset = DataSetFactory.create_dataset(
execution_system,
load_type,
data_type,
destination_table
)
self._load_wrapper = available_loads[load_type](
execution_system=self._execution_system,
dataset=dataset,
load_params=load_params
)
self._execution_system.add_cluster_tags({
EMRSystem.EMRClusterTag.API_METHOD: M3D.load_table.__name__,
def create_dataset(execution_system, load_type, data_type, dataset_name):
if data_type == DataType.STRUCTURED:
dataset = S3Table(
emr_system=execution_system,
destination_table=dataset_name
)
elif data_type == DataType.SEMISTRUCTURED:
if load_type == HiveTable.TableLoadType.APPEND:
dataset = SemistructuredDataSet(
emr_system=execution_system,
dataset_name=dataset_name
)
else:
raise M3DUnsupportedLoadTypeException(
load_type=load_type,
message="Loading algorithm {} not support for data type {}.".format(load_type, data_type)
)
else:
raise M3DUnsupportedDataTypeException(
message="Data Type {} not available.".format(data_type)
)
return dataset