How to use the piicatcher.explorer.metadata.Table function in piicatcher

To help you get started, we’ve selected a few piicatcher examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github tokern / piicatcher / tests / test_explorer.py View on Github external
Namespace(
                host="mock_connection",
                include_schema=(),
                exclude_schema=(),
                include_table=(),
                exclude_table=(),
                catalog=None,
            )
        )

        col1 = Column("c1")
        col2 = Column("c2")
        col2._pii = [PiiTypes.LOCATION]

        schema = Schema("s1")
        table = Table(schema, "t1")
        table.add_child(col1)
        table.add_child(col2)

        schema = Schema("testSchema")
        schema.add_child(table)

        self.explorer._database = Database("database")
        self.explorer._database.add_child(schema)
github tokern / piicatcher / tests / test_databases.py View on Github external
def setUp(self):
        col1 = Column("c1")
        col2 = Column("c2")
        col2._pii = [PiiTypes.LOCATION]

        self.schema = Schema("testSchema")

        table = Table(self.schema, "t1")
        table.add_child(col1)
        table.add_child(col2)

        self.schema.add_child(table)
github tokern / piicatcher / tests / test_models.py View on Github external
def get_full_pii_table():
        full_pii_table = Table("test_store", "full_pii")
        full_pii_a = Column("a")
        full_pii_a.add_pii_type(PiiTypes.PHONE)
        full_pii_b = Column("b")
        full_pii_b.add_pii_type(PiiTypes.ADDRESS)
        full_pii_b.add_pii_type(PiiTypes.LOCATION)

        full_pii_table.add_child(full_pii_a)
        full_pii_table.add_child(full_pii_b)

        return full_pii_table
github tokern / piicatcher / tests / test_dbmetadata.py View on Github external
def test_full_pii_table(self):
        schema = Schema("public")
        table = Table(schema, "full_pii")
        table.add_child(Column("name"))
        table.add_child(Column("dob"))

        table.shallow_scan()
        self.assertTrue(table.has_pii())

        cols = table.get_children()
        self.assertTrue(cols[0].has_pii())
        self.assertTrue(cols[1].has_pii())
github tokern / piicatcher / tests / test_dbmetadata.py View on Github external
def test_no_pii_table(self):
        schema = Schema("public")
        table = Table(schema, "no_pii")
        table.add_child(Column("a"))
        table.add_child(Column("b"))

        table.shallow_scan()
        self.assertFalse(table.has_pii())
github tokern / piicatcher / tests / test_models.py View on Github external
def get_no_pii_table():
        no_pii_table = Table("test_store", "no_pii")
        no_pii_a = Column("a")
        no_pii_b = Column("b")

        no_pii_table.add_child(no_pii_a)
        no_pii_table.add_child(no_pii_b)

        return no_pii_table
github tokern / piicatcher / tests / test_dbmetadata.py View on Github external
def test_full_pii_table(self):
        schema = Schema("public")
        table = Table(schema, "full_pii")
        table.add_child(Column("name"))
        table.add_child(Column("location"))

        table.scan(self.data_generator)
        self.assertTrue(table.has_pii())

        cols = table.get_children()
        self.assertTrue(cols[0].has_pii())
        self.assertTrue(cols[1].has_pii())
        self.assertEqual(
            {
                "columns": [
                    {"name": "name", "pii_types": [PiiTypes.PERSON]},
                    {"name": "location", "pii_types": [PiiTypes.LOCATION]},
                ],
                "has_pii": True,
github tokern / piicatcher / piicatcher / explorer / explorer.py View on Github external
)
                    current_table = Table(current_schema, row[1])

                while row is not None:
                    if current_schema.get_name() != row[0]:
                        current_schema.add_child(current_table)
                        self._database.add_child(current_schema)
                        current_schema = Schema(
                            row[0],
                            include=self._include_table,
                            exclude=self._exclude_table,
                        )
                        current_table = Table(current_schema, row[1])
                    elif current_table.get_name() != row[1]:
                        current_schema.add_child(current_table)
                        current_table = Table(current_schema, row[1])
                    current_table.add_child(Column(row[2]))

                    row = cursor.fetchone()

                if current_schema is not None and current_table is not None:
                    current_schema.add_child(current_table)
                    self._database.add_child(current_schema)

            self._cache_ts = datetime.now()
github tokern / piicatcher / piicatcher / explorer / explorer.py View on Github external
if row is not None:
                    current_schema = Schema(
                        row[0], include=self._include_table, exclude=self._exclude_table
                    )
                    current_table = Table(current_schema, row[1])

                while row is not None:
                    if current_schema.get_name() != row[0]:
                        current_schema.add_child(current_table)
                        self._database.add_child(current_schema)
                        current_schema = Schema(
                            row[0],
                            include=self._include_table,
                            exclude=self._exclude_table,
                        )
                        current_table = Table(current_schema, row[1])
                    elif current_table.get_name() != row[1]:
                        current_schema.add_child(current_table)
                        current_table = Table(current_schema, row[1])
                    current_table.add_child(Column(row[2]))

                    row = cursor.fetchone()

                if current_schema is not None and current_table is not None:
                    current_schema.add_child(current_table)
                    self._database.add_child(current_schema)

            self._cache_ts = datetime.now()