Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
)
col1 = Column("c1")
col2 = Column("c2")
col2._pii = [PiiTypes.LOCATION]
schema = Schema("s1")
table = Table(schema, "t1")
table.add_child(col1)
table.add_child(col2)
schema = Schema("testSchema")
schema.add_child(table)
self.explorer._database = Database("database")
self.explorer._database.add_child(schema)
def _load_catalog(self):
schema = Schema(
"test_store", include=self._include_table, exclude=self._exclude_table
)
schema.add_child(MockExplorer.get_no_pii_table())
schema.add_child(MockExplorer.get_partial_pii_table())
schema.add_child(MockExplorer.get_full_pii_table())
self._database = mdDatabase(
"database", include=self._include_schema, exclude=self._exclude_schema
)
self._database.add_child(schema)
def __init__(self, ns):
self._connection = None
self._cache_ts = None
self.catalog = ns.catalog
self._include_schema = ns.include_schema
self._exclude_schema = ns.exclude_schema
self._include_table = ns.include_table
self._exclude_table = ns.exclude_table
self._database = Database(
"database", include=self._include_schema, exclude=self._exclude_schema
)
def __init__(self, name, include=(), exclude=()):
super(Database, self).__init__(name, include, exclude)
def _load_catalog(self):
if self._cache_ts is None or self._cache_ts < datetime.now() - timedelta(
minutes=10
):
with self._get_context_manager() as cursor:
logging.debug("Catalog Query: {0}".format(self._get_catalog_query()))
cursor.execute(self._get_catalog_query())
self._database = Database(
"database",
include=self._include_schema,
exclude=self._exclude_schema,
)
row = cursor.fetchone()
current_schema = None
current_table = None
if row is not None:
current_schema = Schema(
row[0], include=self._include_table, exclude=self._exclude_table
)
current_table = Table(current_schema, row[1])