How to use the awswrangler.spark.Spark function in awswrangler

To help you get started, we’ve selected a few awswrangler examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
:param explode_outer: Should we preserve the null values on arrays?
        :param explode_pos: Create columns with the index of the ex-array
        :param name: The name of the root Dataframe
        :return: A dictionary with the names as Keys and the DataFrames as Values
        """
        cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=dataframe,
                                                                                 explode_outer=explode_outer,
                                                                                 explode_pos=explode_pos)
        exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
        exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
        dfs: Dict[str, DataFrame] = {name: dataframe.selectExpr(exprs)}
        exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
        for expr in exprs_arr:
            df_arr = dataframe.selectExpr(exprs + [expr])
            name_new: str = Spark._build_name(name=name, expr=expr)
            dfs_new = Spark.flatten(dataframe=df_arr,
                                    explode_outer=explode_outer,
                                    explode_pos=explode_pos,
                                    name=name_new)
            dfs = {**dfs, **dfs_new}
        return dfs
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
        dtype = dtype[7:-1]  # Cutting off "struct<" and ">"
        cols: List[Tuple[str, str]] = []
        struct_acc: int = 0
        path_child: str
        dtype_child: str
        aux: str = ""
        for c, i in zip(dtype, range(len(dtype), 0, -1)):  # Zipping a descendant ID for each letter
            if ((c == ",") and (struct_acc == 0)) or (i == 1):
                if i == 1:
                    aux += c
                path_child, dtype_child = Spark._parse_aux(path=path, aux=aux)
                if Spark._is_struct(dtype=dtype_child):
                    cols += Spark._flatten_struct_column(path=path_child, dtype=dtype_child)  # Recursion
                elif Spark._is_array(dtype=dtype):
                    cols.append((path, "array"))
                else:
                    cols.append((path_child, dtype_child))
                aux = ""
            elif c == "<":
                aux += c
                struct_acc += 1
            elif c == ">":
                aux += c
                struct_acc -= 1
            else:
                aux += c
        return cols
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
def flatten(dataframe: DataFrame,
                explode_outer: bool = True,
                explode_pos: bool = True,
                name: str = "root") -> Dict[str, DataFrame]:
        """
        Convert a complex nested DataFrame in one (or many) flat DataFrames
        If a columns is a struct it is flatten directly.
        If a columns is an array or map, then child DataFrames are created in different granularities.
        :param dataframe: Spark DataFrame
        :param explode_outer: Should we preserve the null values on arrays?
        :param explode_pos: Create columns with the index of the ex-array
        :param name: The name of the root Dataframe
        :return: A dictionary with the names as Keys and the DataFrames as Values
        """
        cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=dataframe,
                                                                                 explode_outer=explode_outer,
                                                                                 explode_pos=explode_pos)
        exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
        exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
        dfs: Dict[str, DataFrame] = {name: dataframe.selectExpr(exprs)}
        exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
        for expr in exprs_arr:
            df_arr = dataframe.selectExpr(exprs + [expr])
            name_new: str = Spark._build_name(name=name, expr=expr)
            dfs_new = Spark.flatten(dataframe=df_arr,
                                    explode_outer=explode_outer,
                                    explode_pos=explode_pos,
                                    name=name_new)
            dfs = {**dfs, **dfs_new}
        return dfs
github awslabs / aws-data-wrangler / awswrangler / session.py View on Github external
def spark(self):
        if not PYSPARK_INSTALLED:
            self._spark = None
        elif not self._spark:
            self._spark = Spark(session=self)
        return self._spark
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
:param iam_role: AWS IAM role with the related permissions
        :param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
        :param distkey: Specifies a column name or positional number for the distribution key
        :param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
        :param sortkey: List of columns to be sorted
        :param min_num_partitions: Minimal number of partitions
        :param mode: append or overwrite
        :return: None
        """
        logger.debug(f"Minimum number of partitions : {min_num_partitions}")
        if path[-1] != "/":
            path += "/"
        self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
        spark: SparkSession = self._session.spark_session
        casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes)
        dataframe = Spark.date2timestamp(dataframe)
        dataframe.cache()
        num_rows: int = dataframe.count()
        logger.info(f"Number of rows: {num_rows}")
        num_partitions: int
        if num_rows < MIN_NUMBER_OF_ROWS_TO_DISTRIBUTE:
            num_partitions = 1
        else:
            num_slices: int = self._session.redshift.get_number_of_slices(redshift_conn=connection)
            logger.debug(f"Number of slices on Redshift: {num_slices}")
            num_partitions = num_slices
            while num_partitions < min_num_partitions:
                num_partitions += num_slices
        logger.debug(f"Number of partitions calculated: {num_partitions}")
        spark.conf.set("spark.sql.execution.arrow.enabled", "true")
        session_primitives = self._session.primitives
        par_col_name: str = "aws_data_wrangler_internal_partition_id"
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
elif Spark._is_array(dtype=dtype):
                cols.append((path, "array"))
            elif Spark._is_map(dtype=dtype):
                cols.append((path, "map"))
            else:
                cols.append((path, dtype))
        cols_exprs: List[Tuple[str, str, str]] = []
        expr: str
        for path, dtype in cols:
            path_under = path.replace('.', '_')
            if Spark._is_array(dtype):
                if explode_pos:
                    expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
                else:
                    expr = f"{explode}({path}) AS {path_under}"
            elif Spark._is_map(dtype):
                if explode_pos:
                    expr = f"{explode}({path}) AS ({path_under}_pos, {path_under}_key, {path_under}_value)"
                else:
                    expr = f"{explode}({path}) AS ({path_under}_key, {path_under}_value)"
            else:
                expr = f"{path} AS {path.replace('.', '_')}"
            cols_exprs.append((path, dtype, expr))
        return cols_exprs
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
:param table: The name of the desired Redshift table
        :param iam_role: AWS IAM role with the related permissions
        :param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
        :param distkey: Specifies a column name or positional number for the distribution key
        :param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
        :param sortkey: List of columns to be sorted
        :param min_num_partitions: Minimal number of partitions
        :param mode: append or overwrite
        :return: None
        """
        logger.debug(f"Minimum number of partitions : {min_num_partitions}")
        if path[-1] != "/":
            path += "/"
        self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
        spark: SparkSession = self._session.spark_session
        casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes)
        dataframe = Spark.date2timestamp(dataframe)
        dataframe.cache()
        num_rows: int = dataframe.count()
        logger.info(f"Number of rows: {num_rows}")
        num_partitions: int
        if num_rows < MIN_NUMBER_OF_ROWS_TO_DISTRIBUTE:
            num_partitions = 1
        else:
            num_slices: int = self._session.redshift.get_number_of_slices(redshift_conn=connection)
            logger.debug(f"Number of slices on Redshift: {num_slices}")
            num_partitions = num_slices
            while num_partitions < min_num_partitions:
                num_partitions += num_slices
        logger.debug(f"Number of partitions calculated: {num_partitions}")
        spark.conf.set("spark.sql.execution.arrow.enabled", "true")
        session_primitives = self._session.primitives
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
        dtype = dtype[7:-1]  # Cutting off "struct<" and ">"
        cols: List[Tuple[str, str]] = []
        struct_acc: int = 0
        path_child: str
        dtype_child: str
        aux: str = ""
        for c, i in zip(dtype, range(len(dtype), 0, -1)):  # Zipping a descendant ID for each letter
            if ((c == ",") and (struct_acc == 0)) or (i == 1):
                if i == 1:
                    aux += c
                path_child, dtype_child = Spark._parse_aux(path=path, aux=aux)
                if Spark._is_struct(dtype=dtype_child):
                    cols += Spark._flatten_struct_column(path=path_child, dtype=dtype_child)  # Recursion
                elif Spark._is_array(dtype=dtype):
                    cols.append((path, "array"))
                else:
                    cols.append((path_child, dtype_child))
                aux = ""
            elif c == "<":
                aux += c
                struct_acc += 1
            elif c == ">":
                aux += c
                struct_acc -= 1
            else:
                aux += c
        return cols
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
explode = f"POS{explode}" if explode_pos is True else explode
        cols: List[Tuple[str, str]] = []
        for path, dtype in df.dtypes:
            if Spark._is_struct(dtype=dtype):
                cols += Spark._flatten_struct_column(path=path, dtype=dtype)
            elif Spark._is_array(dtype=dtype):
                cols.append((path, "array"))
            elif Spark._is_map(dtype=dtype):
                cols.append((path, "map"))
            else:
                cols.append((path, dtype))
        cols_exprs: List[Tuple[str, str, str]] = []
        expr: str
        for path, dtype in cols:
            path_under = path.replace('.', '_')
            if Spark._is_array(dtype):
                if explode_pos:
                    expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
                else:
                    expr = f"{explode}({path}) AS {path_under}"
            elif Spark._is_map(dtype):
                if explode_pos:
                    expr = f"{explode}({path}) AS ({path_under}_pos, {path_under}_key, {path_under}_value)"
                else:
                    expr = f"{explode}({path}) AS ({path_under}_key, {path_under}_value)"
            else:
                expr = f"{path} AS {path.replace('.', '_')}"
            cols_exprs.append((path, dtype, expr))
        return cols_exprs
github awslabs / aws-data-wrangler / awswrangler / spark.py View on Github external
def _flatten_struct_dataframe(df: DataFrame,
                                  explode_outer: bool = True,
                                  explode_pos: bool = True) -> List[Tuple[str, str, str]]:
        explode: str = "EXPLODE_OUTER" if explode_outer is True else "EXPLODE"
        explode = f"POS{explode}" if explode_pos is True else explode
        cols: List[Tuple[str, str]] = []
        for path, dtype in df.dtypes:
            if Spark._is_struct(dtype=dtype):
                cols += Spark._flatten_struct_column(path=path, dtype=dtype)
            elif Spark._is_array(dtype=dtype):
                cols.append((path, "array"))
            elif Spark._is_map(dtype=dtype):
                cols.append((path, "map"))
            else:
                cols.append((path, dtype))
        cols_exprs: List[Tuple[str, str, str]] = []
        expr: str
        for path, dtype in cols:
            path_under = path.replace('.', '_')
            if Spark._is_array(dtype):
                if explode_pos:
                    expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
                else:
                    expr = f"{explode}({path}) AS {path_under}"