Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"emr_cluster_id": "j-D1LSS423N",
},
"algorithm": {
"python_class": "QueryPartitionMaterialization",
"parameters": {
"view": view_name,
"target_partitions": target_partitions,
"date_from": date_from,
"date_to": date_to
}
}
}
emr_system = self._create_emr_system()
configuration = self._create_algorithm_configuration(acon_dict)
algorithm = AlgorithmPartitionMaterialization.RangePartitionMaterialization(
emr_system,
configuration.get_algorithm_instance(),
configuration.get_algorithm_params()
)
generated_params = algorithm.build_params()
expected_source_view = self._create_full_source_view_name(view_name)
expected_target_table = self._create_full_target_table_name(view_name)
expected_params = {
self.KEYS.SOURCE_TABLE: expected_source_view,
self.KEYS.TARGET_TABLE: expected_target_table,
self.KEYS.TARGET_PARTITIONS: target_partitions,
self.KEYS.DATE_FROM: date_from,
self.KEYS.DATE_TO: date_to
}
import pytest
from mock import patch
from m3d.hadoop.algorithm.algorithm_configuration_hadoop import AlgorithmConfigurationHadoop
from m3d.hadoop.algorithm.algorithm_partition_materialization import AlgorithmPartitionMaterialization
from m3d.hadoop.emr.emr_system import EMRSystem
from test.core.emr_system_unit_test_base import EMRSystemUnitTestBase
class TestAlgorithmPartitionMaterialization(EMRSystemUnitTestBase):
KEYS = AlgorithmPartitionMaterialization.BasePartitionMaterialization.ConfigKeys
@pytest.mark.algo
@patch("m3d.hadoop.emr.emr_cluster_client.EMRClusterClient._do_add_emr_cluster_tags")
def test_build_full_materialization_params(self, add_tags_patch):
view_name = "par_mat_test_view"
target_partitions = ["year", "month"]
metadata_update_strategy = "SparkRecoverPartitionsCustom"
acon_dict = {
"environment": {
"emr_cluster_id": "j-D1LSS423N",
},
"algorithm": {
"python_class": "FullPartitionMaterialization",
"parameters": {
"view": view_name,
def _get_supported_emr_algorithms():
"""
Return a list of the available EMR algorithms
:return: dictionary algorithm-name -> algorithm-class
"""
return {
"AlgorithmGzipDecompressionBytesEMR": AlgorithmGzipDecompressionEMR,
"AlgorithmScalaRunner": AlgorithmScalaRunner,
"AlgorithmPartitionFullMaterialization": AlgorithmPartitionMaterialization.FullPartitionMaterialization,
"AlgorithmPartitionQueryMaterialization": AlgorithmPartitionMaterialization.QueryPartitionMaterialization,
"AlgorithmPartitionRangeMaterialization": AlgorithmPartitionMaterialization.RangePartitionMaterialization,
"AlgorithmFixedLengthStringExtractor": AlgorithmFixedLengthStringExtractor,
"AlgorithmNestedFlattener": AlgorithmNestedFlattener,
"AlgorithmAlgorithmTemplate": AlgorithmAlgorithmTemplate
}
def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize algorithm Range Partition Materialization
:param execution_system: an instance of EMRSystem
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""
super(AlgorithmPartitionMaterialization.RangePartitionMaterialization, self).__init__(
execution_system,
algorithm_instance,
algorithm_params
)
def _get_supported_emr_algorithms():
"""
Return a list of the available EMR algorithms
:return: dictionary algorithm-name -> algorithm-class
"""
return {
"AlgorithmGzipDecompressionBytesEMR": AlgorithmGzipDecompressionEMR,
"AlgorithmScalaRunner": AlgorithmScalaRunner,
"AlgorithmPartitionFullMaterialization": AlgorithmPartitionMaterialization.FullPartitionMaterialization,
"AlgorithmPartitionQueryMaterialization": AlgorithmPartitionMaterialization.QueryPartitionMaterialization,
"AlgorithmPartitionRangeMaterialization": AlgorithmPartitionMaterialization.RangePartitionMaterialization,
"AlgorithmFixedLengthStringExtractor": AlgorithmFixedLengthStringExtractor,
"AlgorithmNestedFlattener": AlgorithmNestedFlattener,
"AlgorithmAlgorithmTemplate": AlgorithmAlgorithmTemplate
}
def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize algorithm QueryPartition Materialization
:param execution_system: an instance of EMRSystem
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""
super(AlgorithmPartitionMaterialization.QueryPartitionMaterialization, self).__init__(
execution_system,
algorithm_instance,
algorithm_params
)
def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize algorithm Base Partition Materialization
:param execution_system: an instance of EMRSystem object
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""
super(AlgorithmPartitionMaterialization.BasePartitionMaterialization, self).__init__(
execution_system,
algorithm_instance,
algorithm_params
)
view_name = self._parameters[self.ConfigKeys.VIEW]
self.target_partitions = self._parameters[self.ConfigKeys.TARGET_PARTITIONS]
self.metadata_update_strategy = self._parameters.get(self.ConfigKeys.METADATA_UPDATE_STRATEGY, None)
self.source_view = "{}.{}".format(execution_system.db_mart_mod, view_name)
self.target_table = "{}.{}".format(execution_system.db_mart_cal, view_name)
execution_system.add_cluster_tags({
EMRSystem.EMRClusterTag.SOURCE_VIEW: self.source_view,
EMRSystem.EMRClusterTag.TARGET_TABLE: self.target_table
})
def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize algorithm Full Partition Materialization
:param execution_system: an instance of EMRSystem
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""
super(AlgorithmPartitionMaterialization.FullPartitionMaterialization, self).__init__(
execution_system,
algorithm_instance,
algorithm_params
)