Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_table(*, connection: tab_api.Connection, table: TableType) -> pd.DataFrame:
if isinstance(table, str):
table = tab_api.TableName(table)
table_def = connection.catalog.get_table_definition(table)
columns = table_def.columns
dtypes: Dict[str, str] = {}
for column in columns:
column_type = pantab_types._ColumnType(column.type, column.nullability)
try:
dtypes[column.name.unescaped] = pantab_types._pandas_types[column_type]
except KeyError as e:
raise TypeError(
f"Column {column.name} has unsupported datatype {column.type}"
) from e
query = f"SELECT * from {table}"
dtype_strs = tuple(dtypes.values())
df = pd.DataFrame(libreader.read_hyper_query(connection._cdata, query, dtype_strs))
df.columns = dtypes.keys()
# TODO: remove this hackery...
for k, v in dtypes.items():
def _insert_frame(
df: pd.DataFrame,
*,
connection: tab_api.Connection,
table: pantab_types.TableType,
table_mode: str,
) -> None:
_validate_table_mode(table_mode)
if isinstance(table, str):
table = tab_api.TableName(table)
# Populate insertion mechanisms dependent on column types
column_types: List[pantab_types._ColumnType] = []
columns: List[tab_api.TableDefinition.Column] = []
for col_name, dtype in df.dtypes.items():
column_type = _pandas_to_tableau_type(dtype.name)
column_types.append(column_type)
columns.append(
tab_api.TableDefinition.Column(
name=col_name,
type=column_type.type_,
nullability=column_type.nullability,
)
)
# Sanity check for existing table structures
if table_mode == "a" and connection.catalog.has_table(table):
table_def = connection.catalog.get_table_definition(table)
_assert_columns_equal(columns, table_def.columns)