Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_full_pii_table(self):
schema = Schema("public")
table = Table(schema, "full_pii")
table.add_child(Column("name"))
table.add_child(Column("dob"))
table.shallow_scan()
self.assertTrue(table.has_pii())
cols = table.get_children()
self.assertTrue(cols[0].has_pii())
self.assertTrue(cols[1].has_pii())
def setUp(self):
col1 = Column("c1")
col2 = Column("c2")
col2._pii = [PiiTypes.LOCATION]
self.schema = Schema("testSchema")
table = Table(self.schema, "t1")
table.add_child(col1)
table.add_child(col2)
self.schema.add_child(table)
include_table=(),
exclude_table=(),
catalog=None,
)
)
col1 = Column("c1")
col2 = Column("c2")
col2._pii = [PiiTypes.LOCATION]
schema = Schema("s1")
table = Table(schema, "t1")
table.add_child(col1)
table.add_child(col2)
schema = Schema("testSchema")
schema.add_child(table)
self.explorer._database = Database("database")
self.explorer._database.add_child(schema)
self.explorer = MockExplorer(
Namespace(
host="mock_connection",
include_schema=(),
exclude_schema=(),
include_table=(),
exclude_table=(),
catalog=None,
)
)
col1 = Column("c1")
col2 = Column("c2")
col2._pii = [PiiTypes.LOCATION]
schema = Schema("s1")
table = Table(schema, "t1")
table.add_child(col1)
table.add_child(col2)
schema = Schema("testSchema")
schema.add_child(table)
self.explorer._database = Database("database")
self.explorer._database.add_child(schema)
def test_partial_pii_table(self):
schema = Schema("public")
table = Table(schema, "partial_pii")
table.add_child(Column("a"))
table.add_child(Column("b"))
table.scan(self.data_generator)
self.assertTrue(table.has_pii())
cols = table.get_children()
self.assertTrue(cols[0].has_pii())
self.assertFalse(cols[1].has_pii())
self.assertEqual(
{
"columns": [
{"name": "a", "pii_types": [PiiTypes.PHONE]},
{"name": "b", "pii_types": []},
],
"has_pii": True,
def _load_catalog(self):
schema = Schema(
"test_store", include=self._include_table, exclude=self._exclude_table
)
schema.add_child(MockExplorer.get_no_pii_table())
schema.add_child(MockExplorer.get_partial_pii_table())
schema.add_child(MockExplorer.get_full_pii_table())
self._database = mdDatabase(
"database", include=self._include_schema, exclude=self._exclude_schema
)
self._database.add_child(schema)
def test_no_pii_table(self):
schema = Schema("public")
table = Table(schema, "no_pii")
table.add_child(Column("a"))
table.add_child(Column("b"))
table.scan(self.data_generator)
self.assertFalse(table.has_pii())
self.assertEqual(
{
"columns": [
{"name": "a", "pii_types": []},
{"name": "b", "pii_types": []},
],
"has_pii": False,
"name": "no_pii",
},
table.get_dict(),
def __init__(self, name, include=(), exclude=()):
super(Schema, self).__init__(name, include, exclude)
row = cursor.fetchone()
current_schema = None
current_table = None
if row is not None:
current_schema = Schema(
row[0], include=self._include_table, exclude=self._exclude_table
)
current_table = Table(current_schema, row[1])
while row is not None:
if current_schema.get_name() != row[0]:
current_schema.add_child(current_table)
self._database.add_child(current_schema)
current_schema = Schema(
row[0],
include=self._include_table,
exclude=self._exclude_table,
)
current_table = Table(current_schema, row[1])
elif current_table.get_name() != row[1]:
current_schema.add_child(current_table)
current_table = Table(current_schema, row[1])
current_table.add_child(Column(row[2]))
row = cursor.fetchone()
if current_schema is not None and current_table is not None:
current_schema.add_child(current_table)
self._database.add_child(current_schema)
with self._get_context_manager() as cursor:
logging.debug("Catalog Query: {0}".format(self._get_catalog_query()))
cursor.execute(self._get_catalog_query())
self._database = Database(
"database",
include=self._include_schema,
exclude=self._exclude_schema,
)
row = cursor.fetchone()
current_schema = None
current_table = None
if row is not None:
current_schema = Schema(
row[0], include=self._include_table, exclude=self._exclude_table
)
current_table = Table(current_schema, row[1])
while row is not None:
if current_schema.get_name() != row[0]:
current_schema.add_child(current_table)
self._database.add_child(current_schema)
current_schema = Schema(
row[0],
include=self._include_table,
exclude=self._exclude_table,
)
current_table = Table(current_schema, row[1])
elif current_table.get_name() != row[1]:
current_schema.add_child(current_table)