Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup_catalog():
with pymysql.connect(
host=catalog["host"],
port=catalog["port"],
user="root",
password="r00tPa33w0rd",
database="piidb",
).cursor() as c:
c.execute(
"CREATE USER IF NOT EXISTS catalog_user IDENTIFIED BY 'catal0g_passw0rd'"
)
c.execute("CREATE DATABASE IF NOT EXISTS tokern")
c.execute("GRANT ALL ON tokern.* TO 'catalog_user'@'%'")
class MockExplorer(Explorer):
@classmethod
def parser(cls, sub_parsers):
pass
def _open_connection(self):
pass
def _get_catalog_query(self):
pass
@staticmethod
def get_no_pii_table():
no_pii_table = Table("test_store", "no_pii")
no_pii_a = Column("a")
no_pii_b = Column("b")
import json
from argparse import Namespace
from unittest import TestCase
from piicatcher.explorer.explorer import Explorer
from piicatcher.explorer.metadata import Column, Database, Schema, Table
from piicatcher.piitypes import PiiTypeEncoder, PiiTypes
class MockExplorer(Explorer):
def _open_connection(self):
pass
def _get_catalog_query(self):
pass
@classmethod
def parser(cls, sub_parsers):
pass
def _load_catalog(self):
pass
class ExplorerTest(TestCase):
def setUp(self):
if output_format is not None or output is not None:
logging.warning(
"--output-format and --output is deprecated. "
"Please use --catalog-format and --catalog-file"
)
if output_format is not None:
args.catalog["format"] = output_format
if output is not None:
args.catalog["file"] = output
AthenaExplorer.dispatch(args)
class AthenaExplorer(Explorer):
_catalog_query = """
SELECT
table_schema, table_name, column_name
FROM
information_schema.columns
WHERE data_type LIKE '%char%' AND
table_schema != 'information_schema'
ORDER BY table_schema, table_name, ordinal_position
"""
_sample_query_template = "select {column_list} from {schema_name}.{table_name} TABLESAMPLE BERNOULLI(5) limit 10"
_select_query_template = (
"select {column_list} from {schema_name}.{table_name} limit 10"
)
_count_query = "select count(*) from {schema_name}.{table_name}"
if output_format is not None or output is not None:
logging.warning(
"--output-format and --output is deprecated. "
"Please use --catalog-format and --catalog-file"
)
if output_format is not None:
ns.catalog["format"] = output_format
if output is not None:
ns.catalog["file"] = output
RelDbExplorer.dispatch(ns)
class RelDbExplorer(Explorer):
def __init__(self, ns):
super(RelDbExplorer, self).__init__(ns)
self.host = ns.host
self.user = ns.user
self.password = ns.password
self.port = (
int(ns.port)
if "port" in vars(ns) and ns.port is not None
else self.default_port
)
@property
@abstractmethod
def default_port(self):
pass
if output_format is not None or output is not None:
logging.warning(
"--output-format and --output is deprecated. "
"Please use --catalog-format and --catalog-file"
)
if output_format is not None:
args.catalog["format"] = output_format
if output is not None:
args.catalog["file"] = output
SqliteExplorer.dispatch(args)
class SqliteExplorer(Explorer):
_catalog_query = """
SELECT
"" as schema_name,
m.name as table_name,
p.name as column_name,
p.type as data_type
FROM
sqlite_master AS m
JOIN
pragma_table_info(m.name) AS p
WHERE
p.type like 'text' or p.type like 'varchar%' or p.type like 'char%'
ORDER BY
m.name,
p.name
"""
okta_account_name=okta_account_name,
oauth_token=oauth_token,
oauth_host=oauth_host,
scan_type=scan_type,
list_all=list_all,
catalog=cxt.obj["catalog"],
include_schema=include_schema,
exclude_schema=exclude_schema,
include_table=include_table,
exclude_table=exclude_table,
)
SnowflakeExplorer.dispatch(ns)
class SnowflakeExplorer(Explorer):
_catalog_query = """
SELECT
TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA')
AND DATA_TYPE = 'TEXT'
ORDER BY table_schema, table_name, column_name
"""
_sample_query_template = "select {column_list} from {schema_name}.{table_name} TABLESAMPLE BERNOULLI (10 ROWS)"
def __init__(self, ns):
super(SnowflakeExplorer, self).__init__(ns)
self.account = ns.account