Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_table(self,
database,
table,
schema,
path,
file_format,
compression,
partition_cols_schema=None,
extra_args=None):
if file_format == "parquet":
table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression)
elif file_format == "csv":
table_input = Glue.csv_table_definition(table,
partition_cols_schema,
schema,
path,
compression,
extra_args=extra_args)
else:
raise UnsupportedFileFormat(file_format)
self._client_glue.create_table(DatabaseName=database, TableInput=table_input)
def _parse_partitions_tuples(objects_paths, partition_cols):
paths = {f"{path.rpartition('/')[0]}/" for path in objects_paths}
return [(
path,
Glue._parse_partition_values(path=path, partition_cols=partition_cols),
) for path in paths]
def create_table(self,
database,
table,
schema,
path,
file_format,
compression,
partition_cols_schema=None,
extra_args=None):
if file_format == "parquet":
table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression)
elif file_format == "csv":
table_input = Glue.csv_table_definition(table,
partition_cols_schema,
schema,
path,
compression,
extra_args=extra_args)
else:
raise UnsupportedFileFormat(file_format)
self._client_glue.create_table(DatabaseName=database, TableInput=table_input)
def metadata_to_glue(self,
dataframe,
path,
objects_paths,
file_format,
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
compression=None,
cast_columns=None,
extra_args=None):
indexes_position = "left" if file_format == "csv" else "right"
schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe,
partition_cols=partition_cols,
preserve_index=preserve_index,
indexes_position=indexes_position,
cast_columns=cast_columns)
table = table if table else Glue.parse_table_name(path)
table = Athena.normalize_table_name(name=table)
if mode == "overwrite":
self.delete_table_if_exists(database=database, table=table)
exists = self.does_table_exists(database=database, table=table)
if not exists:
self.create_table(database=database,
table=table,
schema=schema,
partition_cols_schema=partition_cols_schema,
path=path,
file_format=file_format,
def glue(self):
if not self._glue:
self._glue = Glue(session=self)
return self._glue
def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None):
if not partition_paths:
return None
partitions = list()
for partition in partition_paths:
if file_format == "parquet":
partition_def = Glue.parquet_partition_definition(partition=partition, compression=compression)
elif file_format == "csv":
partition_def = Glue.csv_partition_definition(partition=partition,
compression=compression,
extra_args=extra_args)
else:
raise UnsupportedFileFormat(file_format)
partitions.append(partition_def)
pages_num = int(ceil(len(partitions) / 100.0))
for _ in range(pages_num):
page = partitions[:100]
del partitions[:100]
res = self._client_glue.batch_create_partition(DatabaseName=database,
TableName=table,
PartitionInputList=page)
for error in res["Errors"]:
if "ErrorDetail" in error:
if "ErrorCode" in error["ErrorDetail"]:
if error["ErrorDetail"]["ErrorCode"] != "AlreadyExistsException":
table = table if table else Glue.parse_table_name(path)
table = Athena.normalize_table_name(name=table)
if mode == "overwrite":
self.delete_table_if_exists(database=database, table=table)
exists = self.does_table_exists(database=database, table=table)
if not exists:
self.create_table(database=database,
table=table,
schema=schema,
partition_cols_schema=partition_cols_schema,
path=path,
file_format=file_format,
compression=compression,
extra_args=extra_args)
if partition_cols:
partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths,
partition_cols=partition_cols)
self.add_partitions(database=database,
table=table,
partition_paths=partitions_tuples,
file_format=file_format,
compression=compression,
extra_args=extra_args)
def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None):
if not partition_paths:
return None
partitions = list()
for partition in partition_paths:
if file_format == "parquet":
partition_def = Glue.parquet_partition_definition(partition=partition, compression=compression)
elif file_format == "csv":
partition_def = Glue.csv_partition_definition(partition=partition,
compression=compression,
extra_args=extra_args)
else:
raise UnsupportedFileFormat(file_format)
partitions.append(partition_def)
pages_num = int(ceil(len(partitions) / 100.0))
for _ in range(pages_num):
page = partitions[:100]
del partitions[:100]
res = self._client_glue.batch_create_partition(DatabaseName=database,
TableName=table,
PartitionInputList=page)
for error in res["Errors"]:
if "ErrorDetail" in error:
file_format,
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
compression=None,
cast_columns=None,
extra_args=None):
indexes_position = "left" if file_format == "csv" else "right"
schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe,
partition_cols=partition_cols,
preserve_index=preserve_index,
indexes_position=indexes_position,
cast_columns=cast_columns)
table = table if table else Glue.parse_table_name(path)
table = Athena.normalize_table_name(name=table)
if mode == "overwrite":
self.delete_table_if_exists(database=database, table=table)
exists = self.does_table_exists(database=database, table=table)
if not exists:
self.create_table(database=database,
table=table,
schema=schema,
partition_cols_schema=partition_cols_schema,
path=path,
file_format=file_format,
compression=compression,
extra_args=extra_args)
if partition_cols:
partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths,
partition_cols=partition_cols)