Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
test_lake_key = "{environment}/{source_system}/{table}/data/{obj_name}".format(
environment=destination_environment,
db_cd=db_lake,
source_system=source_system,
table=table,
obj_name=test_lake_key_filename
)
# adding data to landing and lake directories
self.s3_resource.Bucket(bucket_landing).put_object(Key=test_land_key, Body=test_content)
self.s3_resource.Bucket(bucket_lake).put_object(Key=test_lake_key, Body=test_content)
logging.info("Calling M3D.create_table()")
M3D.create_table(*table_config_args, **table_config_kwargs)
logging.info("Calling M3D.drop_table()")
M3D.drop_table(*table_config_args, **table_config_kwargs)
emr_backend = self.mock_emr.backends[self.default_aws_region]
fake_cluster = emr_backend.clusters[self.emr_cluster_id]
assert 3 == len(fake_cluster.steps)
# Get actual HQL statements
actual_hqls = []
for step in fake_cluster.steps:
assert ["hive", "--silent", "-f"] == step.args[0:3]
hql_file = step.args[3]
environment=destination_environment,
db_cd=db_lake,
source_system=source_system,
table=table,
obj_name=test_lake_key_filename
)
# adding data to landing and lake directories
self.s3_resource.Bucket(bucket_landing).put_object(Key=test_land_key, Body=test_content)
self.s3_resource.Bucket(bucket_lake).put_object(Key=test_lake_key, Body=test_content)
logging.info("Calling M3D.create_table()")
M3D.create_table(*table_config_args, **table_config_kwargs)
logging.info("Calling M3D.drop_table()")
M3D.drop_table(*table_config_args, **table_config_kwargs)
emr_backend = self.mock_emr.backends[self.default_aws_region]
fake_cluster = emr_backend.clusters[self.emr_cluster_id]
assert 3 == len(fake_cluster.steps)
# Get actual HQL statements
actual_hqls = []
for step in fake_cluster.steps:
assert ["hive", "--silent", "-f"] == step.args[0:3]
hql_file = step.args[3]
hql_content = self.get_object_content_from_s3(hql_file)
actual_hqls.append(hql_content)
py.path.local(tconx_file).write(py.path.local(tconx_src_path).read())
table_config = [
m3d_config_file,
destination_system,
destination_database,
destination_environment,
destination_table
]
table_config_kwargs = {
"emr_cluster_id": self.emr_cluster_id
}
with pytest.raises(M3DDatabaseException) as exc_info:
M3D.create_lake_out_view(*table_config, **table_config_kwargs)
assert "lake_out view name does not exist" == str(exc_info.value)
def load_table(self, emr_cluster_id, spark_parameters=None):
if spark_parameters is None:
M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id]))
else:
M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id, spark_parameters]))
m3d_config_file,
destination_system,
destination_database,
destination_environment,
destination_table
]
table_config_kwargs = {
"emr_cluster_id": self.emr_cluster_id
}
emr_steps_completer = self.create_emr_steps_completer(expected_steps_count=1, timeout_seconds=3)
with ConcurrentExecutor(emr_steps_completer, delay_sec=0.4):
logging.info("Calling M3D.create_lake_out_view().")
M3D.create_lake_out_view(*table_config, **table_config_kwargs)
emr_system = EMRSystem(*table_config[:5])
s3_table = S3Table(emr_system, destination_table)
mock_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]
assert 1 == len(mock_cluster.steps)
hive_step = mock_cluster.steps[0]
assert hive_step.args[0] == "hive"
assert hive_step.args[1] == "--silent"
assert hive_step.args[2] == "-f"
actual_hql_content_in_bucket = self.get_object_content_from_s3(hive_step.args[3])
column_name_pairs = [
destination_system,
destination_database,
destination_environment,
destination_table
]
table_config_kwargs = {
"emr_cluster_id": self.emr_cluster_id
}
db_lake_out = scon_emr_dict["environments"][destination_environment]["schemas"]["lake_out"]
lake_out = "bi_test101"
logging.info("Calling M3D.drop_lake_out_view()")
M3D.drop_lake_out_view(*table_config_args, **table_config_kwargs)
emr_backend = self.mock_emr.backends[self.default_aws_region]
fake_cluster = emr_backend.clusters[self.emr_cluster_id]
assert 1 == len(fake_cluster.steps)
hive_step = fake_cluster.steps[0]
assert hive_step.args[0] == "hive"
assert hive_step.args[1] == "--silent"
assert hive_step.args[2] == "-f"
actual_hql_content_in_bucket = self.get_object_content_from_s3(hive_step.args[3])
expected_hql = "DROP VIEW IF EXISTS {}.{};".format(db_lake_out, lake_out)
assert expected_hql == actual_hql_content_in_bucket
db_cd=db_lake,
source_system=source_system,
table=table
)
lake_data_dir = os.path.join(lake_dir, "data")
lake_data_key = os.path.join(lake_data_dir, "new_lake_dump")
# adding data to landing and lake directories
self.s3_resource.Bucket(bucket_landing).put_object(Key=landing_data_key, Body=test_content)
self.s3_resource.Bucket(bucket_landing).put_object(Key=landing_archive_key, Body=test_content)
self.s3_resource.Bucket(bucket_landing).put_object(Key=landing_work_key, Body=test_content)
self.s3_resource.Bucket(bucket_lake).put_object(Key=lake_data_key, Body=test_content)
logging.info("Calling M3D.truncate_table()")
M3D.truncate_table(*table_config_args, **table_config_kwargs)
emr_backend = self.mock_emr.backends[self.default_aws_region]
fake_cluster = emr_backend.clusters[self.emr_cluster_id]
assert len(fake_cluster.steps) == 2
# Get actual HQL statements
actual_hqls = []
for step in fake_cluster.steps:
assert ["hive", "--silent", "-f"] == step.args[0:3]
hql_file = step.args[3]
hql_content = self.get_object_content_from_s3(hql_file)
actual_hqls.append(hql_content)