Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
self.mover = Mover(
database_type='postgresql',
options=self.options,
runner=self.runner,
version_detector=self.version_detector,
Binary=driver.Binary,
)
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.mover.jsonifier = Jsonifier(
transform=getattr(self.options, 'transform', None))
self.mover.auxiliary_tables = getattr(self.options,
'auxiliary_tables', ())
class Mover(relstorage.adapters.postgresql.mover.PostgreSQLObjectMover):
def on_store_opened(self, cursor, restart=False):
cursor.execute("""\
select from information_schema.tables
where table_name = 'temp_store' and table_type = 'LOCAL TEMPORARY'
""")
if not list(cursor):
super(Mover, self).on_store_opened(cursor, restart)
cursor.execute("""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_store_json (
zoid BIGINT NOT NULL,
class_name TEXT,
ghost_pickle BYTEA,
state JSONB
) ON COMMIT DROP""")