Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args, **kw):
super(Adapter, self).__init__(*args, **kw)
driver = relstorage.adapters.postgresql.select_driver(self.options)
self.schema = SchemaInstaller(
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
keep_history=self.keep_history,
)
self.mover = Mover(
database_type='postgresql',
options=self.options,
runner=self.runner,
version_detector=self.version_detector,
Binary=driver.Binary,
)
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.mover.jsonifier = Jsonifier(
def pg_connection(dsn, driver_name='auto'):
"""Create a PostgreSQL (not newt) database connection
This function should be used rather than, for example, calling
``psycopg2.connect``, because it can use other Postgres drivers
depending on the Python environment and available modules.
"""
options = relstorage.options.Options(driver=driver_name)
driver = relstorage.adapters.postgresql.select_driver(options)
return driver.connect(dsn)
options = parser.parse_args(args)
if options.logging_configuration.upper() in logging_levels:
logging.basicConfig(level=options.logging_configuration.upper())
else:
with open(options.logging_configuration) as f:
from ZConfig import configureLoggers
configureLoggers(f.read())
transform = options.transform
if transform is not None:
from .component import global_by_name
transform = global_by_name(transform)
jsonifier = Jsonifier(transform=transform)
driver = relstorage.adapters.postgresql.select_driver(
relstorage.options.Options(driver=options.driver))
Binary = driver.Binary
dsn = options.connection_string
with closing(pg_connection(dsn)) as conn:
with closing(conn.cursor()) as cursor:
if options.nagios:
if not table_exists(cursor, 'newt_follow_progress'):
print("Updater has not run")
return 2
cursor.execute("select max(tid) from object_state")
[[stid]] = cursor
utid = follow.get_progress_tid(conn, __name__)
if stid is None:
if utid == -1:
print("No transactions")
return 0