How to use the m3d.M3D.load_table function in m3d

To help you get started, we’ve selected a few m3d examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github adidas / m3d-api / test / integration / test_load_table_full_s3.py View on Github external
bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]
        emr_system = EMRSystem(
            m3d_config_file,
            destination_system,
            destination_database,
            destination_environment
        )
        test_s3_table = S3Table(emr_system, destination_table)

        # Put landing data
        self.dump_data_to_s3(
            os.path.join(test_s3_table.dir_landing_final, landing_dataset),
            "t|e|s|t|a|d|i|d|a|s|m|3|d|"
        )

        M3D.load_table(*table_config)

        # Since we have offloaded data move operations to EMR Steps dir_landing_final will still have
        # old files in it and dir_landing_archive will not have new files
        landing_files = self.get_child_objects(test_s3_table.dir_landing_final)
        assert len(landing_files) == 1
        assert landing_files[0] == os.path.join(test_s3_table.dir_landing_final, landing_dataset)

        landing_archive_files = self.get_child_objects(test_s3_table.dir_landing_archive)
        assert len(landing_archive_files) == 0

        # Check EMR steps.
        fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]

        assert 1 == len(fake_cluster.steps)

        expected_algorithms_jar_path = "s3://" + bucket_application + os.path.join(
github adidas / m3d-api / test / integration / test_load_table_full_s3.py View on Github external
emr_system = EMRSystem(
            m3d_config_file,
            destination_system,
            destination_database,
            destination_environment
        )
        test_s3_table = S3Table(emr_system, destination_table)

        # Put landing data
        self.dump_data_to_s3(
            os.path.join(test_s3_table.dir_landing_final, landing_dataset),
            "t|e|s|t|a|d|i|d|a|s|m|3|d|"
        )

        M3D.load_table(*table_config, spark_params=json.dumps(spark_external_parameters))

        # psv file will still be in landing since move operation should be
        # performed by EMR Step which we mock here. Accordingly archive will
        # still be empty.
        landing_files = self.get_child_objects(test_s3_table.dir_landing_final)
        assert len(landing_files) == 1
        assert landing_files[0] == os.path.join(test_s3_table.dir_landing_final, landing_dataset)

        landing_archive_files = self.get_child_objects(test_s3_table.dir_landing_archive)
        assert len(landing_archive_files) == 0

        # Check EMR steps.
        fake_cluster = self.mock_emr.backends[self.default_aws_region].clusters[self.emr_cluster_id]

        assert 1 == len(fake_cluster.steps)
github adidas / m3d-api / test / integration / test_load_table_append_s3.py View on Github external
def load_table(self, emr_cluster_id, spark_parameters=None):
        if spark_parameters is None:
            M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id]))
        else:
            M3D.load_table(*(self.table_config + [self.load_type, emr_cluster_id, spark_parameters]))
github adidas / m3d-api / test / integration / test_load_table_delta_s3.py View on Github external
destination_database,
            destination_environment
        )
        s3_table_active = S3Table(emr_system, destination_active_table)
        s3_table_changelog = S3Table(emr_system, destination_changelog_table)

        # Extract bucket names
        bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]

        # Put lake data for changelog table, this should be archived
        self.dump_data_to_s3(
            os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
            "t|e|s|t|a|d|i|d|a|s|m|3|d|",
        )

        M3D.load_table(
            m3d_config_file,
            destination_system,
            destination_database,
            destination_environment,
            destination_active_table,
            load_type,
            self.emr_cluster_id,
            spark_params=spark_external_parameters
        )

        filename_json = "delta_load-{environment}-{table}.json".format(
            environment=destination_environment,
            table=destination_active_table
        )

        # Checking configuration file for m3d-engine
github adidas / m3d-api / test / integration / test_load_table_delta_s3.py View on Github external
destination_database,
            destination_environment
        )
        s3_table_active = S3Table(emr_system, destination_active_table)
        s3_table_changelog = S3Table(emr_system, destination_changelog_table)

        # Extract bucket names
        bucket_application = scon_emr_dict["environments"][destination_environment]["s3_buckets"]["application"]

        # Put lake data for changelog table, this should be archived
        self.dump_data_to_s3(
            os.path.join(s3_table_changelog.dir_lake_final, "changelog.parquet"),
            "t|e|s|t|a|d|i|d|a|s|m|3|d|",
        )

        M3D.load_table(
            m3d_config_file,
            destination_system,
            destination_database,
            destination_environment,
            destination_active_table,
            load_type,
            self.emr_cluster_id,
            spark_params=json.dumps(spark_external_parameters)
        )

        filename_json = "delta_load-{environment}-{table}.json".format(
            environment=destination_environment,
            table=destination_active_table
        )

        # Checking configuration file for m3d-engine
github adidas / m3d-api / m3d / hadoop / load / load_executor_hadoop.py View on Github external
dataset = DataSetFactory.create_dataset(
            execution_system,
            load_type,
            data_type,
            destination_table
        )

        self._load_wrapper = available_loads[load_type](
            execution_system=self._execution_system,
            dataset=dataset,
            load_params=load_params
        )

        self._execution_system.add_cluster_tags({
            EMRSystem.EMRClusterTag.API_METHOD: M3D.load_table.__name__,
            EMRSystem.EMRClusterTag.LOAD_TYPE: load_type,
            EMRSystem.EMRClusterTag.TARGET_TABLE: destination_table
        })