Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param explode_outer: Should we preserve the null values on arrays?
:param explode_pos: Create columns with the index of the ex-array
:param name: The name of the root Dataframe
:return: A dictionary with the names as Keys and the DataFrames as Values
"""
cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=dataframe,
explode_outer=explode_outer,
explode_pos=explode_pos)
exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
dfs: Dict[str, DataFrame] = {name: dataframe.selectExpr(exprs)}
exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
for expr in exprs_arr:
df_arr = dataframe.selectExpr(exprs + [expr])
name_new: str = Spark._build_name(name=name, expr=expr)
dfs_new = Spark.flatten(dataframe=df_arr,
explode_outer=explode_outer,
explode_pos=explode_pos,
name=name_new)
dfs = {**dfs, **dfs_new}
return dfs
def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
dtype = dtype[7:-1] # Cutting off "struct<" and ">"
cols: List[Tuple[str, str]] = []
struct_acc: int = 0
path_child: str
dtype_child: str
aux: str = ""
for c, i in zip(dtype, range(len(dtype), 0, -1)): # Zipping a descendant ID for each letter
if ((c == ",") and (struct_acc == 0)) or (i == 1):
if i == 1:
aux += c
path_child, dtype_child = Spark._parse_aux(path=path, aux=aux)
if Spark._is_struct(dtype=dtype_child):
cols += Spark._flatten_struct_column(path=path_child, dtype=dtype_child) # Recursion
elif Spark._is_array(dtype=dtype):
cols.append((path, "array"))
else:
cols.append((path_child, dtype_child))
aux = ""
elif c == "<":
aux += c
struct_acc += 1
elif c == ">":
aux += c
struct_acc -= 1
else:
aux += c
return cols
def flatten(dataframe: DataFrame,
explode_outer: bool = True,
explode_pos: bool = True,
name: str = "root") -> Dict[str, DataFrame]:
"""
Convert a complex nested DataFrame in one (or many) flat DataFrames
If a columns is a struct it is flatten directly.
If a columns is an array or map, then child DataFrames are created in different granularities.
:param dataframe: Spark DataFrame
:param explode_outer: Should we preserve the null values on arrays?
:param explode_pos: Create columns with the index of the ex-array
:param name: The name of the root Dataframe
:return: A dictionary with the names as Keys and the DataFrames as Values
"""
cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=dataframe,
explode_outer=explode_outer,
explode_pos=explode_pos)
exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
dfs: Dict[str, DataFrame] = {name: dataframe.selectExpr(exprs)}
exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
for expr in exprs_arr:
df_arr = dataframe.selectExpr(exprs + [expr])
name_new: str = Spark._build_name(name=name, expr=expr)
dfs_new = Spark.flatten(dataframe=df_arr,
explode_outer=explode_outer,
explode_pos=explode_pos,
name=name_new)
dfs = {**dfs, **dfs_new}
return dfs
def spark(self):
if not PYSPARK_INSTALLED:
self._spark = None
elif not self._spark:
self._spark = Spark(session=self)
return self._spark
:param iam_role: AWS IAM role with the related permissions
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
:param distkey: Specifies a column name or positional number for the distribution key
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
:param sortkey: List of columns to be sorted
:param min_num_partitions: Minimal number of partitions
:param mode: append or overwrite
:return: None
"""
logger.debug(f"Minimum number of partitions : {min_num_partitions}")
if path[-1] != "/":
path += "/"
self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
spark: SparkSession = self._session.spark_session
casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes)
dataframe = Spark.date2timestamp(dataframe)
dataframe.cache()
num_rows: int = dataframe.count()
logger.info(f"Number of rows: {num_rows}")
num_partitions: int
if num_rows < MIN_NUMBER_OF_ROWS_TO_DISTRIBUTE:
num_partitions = 1
else:
num_slices: int = self._session.redshift.get_number_of_slices(redshift_conn=connection)
logger.debug(f"Number of slices on Redshift: {num_slices}")
num_partitions = num_slices
while num_partitions < min_num_partitions:
num_partitions += num_slices
logger.debug(f"Number of partitions calculated: {num_partitions}")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
session_primitives = self._session.primitives
par_col_name: str = "aws_data_wrangler_internal_partition_id"
elif Spark._is_array(dtype=dtype):
cols.append((path, "array"))
elif Spark._is_map(dtype=dtype):
cols.append((path, "map"))
else:
cols.append((path, dtype))
cols_exprs: List[Tuple[str, str, str]] = []
expr: str
for path, dtype in cols:
path_under = path.replace('.', '_')
if Spark._is_array(dtype):
if explode_pos:
expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
else:
expr = f"{explode}({path}) AS {path_under}"
elif Spark._is_map(dtype):
if explode_pos:
expr = f"{explode}({path}) AS ({path_under}_pos, {path_under}_key, {path_under}_value)"
else:
expr = f"{explode}({path}) AS ({path_under}_key, {path_under}_value)"
else:
expr = f"{path} AS {path.replace('.', '_')}"
cols_exprs.append((path, dtype, expr))
return cols_exprs
:param table: The name of the desired Redshift table
:param iam_role: AWS IAM role with the related permissions
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
:param distkey: Specifies a column name or positional number for the distribution key
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
:param sortkey: List of columns to be sorted
:param min_num_partitions: Minimal number of partitions
:param mode: append or overwrite
:return: None
"""
logger.debug(f"Minimum number of partitions : {min_num_partitions}")
if path[-1] != "/":
path += "/"
self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
spark: SparkSession = self._session.spark_session
casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes)
dataframe = Spark.date2timestamp(dataframe)
dataframe.cache()
num_rows: int = dataframe.count()
logger.info(f"Number of rows: {num_rows}")
num_partitions: int
if num_rows < MIN_NUMBER_OF_ROWS_TO_DISTRIBUTE:
num_partitions = 1
else:
num_slices: int = self._session.redshift.get_number_of_slices(redshift_conn=connection)
logger.debug(f"Number of slices on Redshift: {num_slices}")
num_partitions = num_slices
while num_partitions < min_num_partitions:
num_partitions += num_slices
logger.debug(f"Number of partitions calculated: {num_partitions}")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
session_primitives = self._session.primitives
def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
dtype = dtype[7:-1] # Cutting off "struct<" and ">"
cols: List[Tuple[str, str]] = []
struct_acc: int = 0
path_child: str
dtype_child: str
aux: str = ""
for c, i in zip(dtype, range(len(dtype), 0, -1)): # Zipping a descendant ID for each letter
if ((c == ",") and (struct_acc == 0)) or (i == 1):
if i == 1:
aux += c
path_child, dtype_child = Spark._parse_aux(path=path, aux=aux)
if Spark._is_struct(dtype=dtype_child):
cols += Spark._flatten_struct_column(path=path_child, dtype=dtype_child) # Recursion
elif Spark._is_array(dtype=dtype):
cols.append((path, "array"))
else:
cols.append((path_child, dtype_child))
aux = ""
elif c == "<":
aux += c
struct_acc += 1
elif c == ">":
aux += c
struct_acc -= 1
else:
aux += c
return cols
explode = f"POS{explode}" if explode_pos is True else explode
cols: List[Tuple[str, str]] = []
for path, dtype in df.dtypes:
if Spark._is_struct(dtype=dtype):
cols += Spark._flatten_struct_column(path=path, dtype=dtype)
elif Spark._is_array(dtype=dtype):
cols.append((path, "array"))
elif Spark._is_map(dtype=dtype):
cols.append((path, "map"))
else:
cols.append((path, dtype))
cols_exprs: List[Tuple[str, str, str]] = []
expr: str
for path, dtype in cols:
path_under = path.replace('.', '_')
if Spark._is_array(dtype):
if explode_pos:
expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
else:
expr = f"{explode}({path}) AS {path_under}"
elif Spark._is_map(dtype):
if explode_pos:
expr = f"{explode}({path}) AS ({path_under}_pos, {path_under}_key, {path_under}_value)"
else:
expr = f"{explode}({path}) AS ({path_under}_key, {path_under}_value)"
else:
expr = f"{path} AS {path.replace('.', '_')}"
cols_exprs.append((path, dtype, expr))
return cols_exprs
def _flatten_struct_dataframe(df: DataFrame,
explode_outer: bool = True,
explode_pos: bool = True) -> List[Tuple[str, str, str]]:
explode: str = "EXPLODE_OUTER" if explode_outer is True else "EXPLODE"
explode = f"POS{explode}" if explode_pos is True else explode
cols: List[Tuple[str, str]] = []
for path, dtype in df.dtypes:
if Spark._is_struct(dtype=dtype):
cols += Spark._flatten_struct_column(path=path, dtype=dtype)
elif Spark._is_array(dtype=dtype):
cols.append((path, "array"))
elif Spark._is_map(dtype=dtype):
cols.append((path, "map"))
else:
cols.append((path, dtype))
cols_exprs: List[Tuple[str, str, str]] = []
expr: str
for path, dtype in cols:
path_under = path.replace('.', '_')
if Spark._is_array(dtype):
if explode_pos:
expr = f"{explode}({path}) AS ({path_under}_pos, {path_under})"
else:
expr = f"{explode}({path}) AS {path_under}"