Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if options.logging_configuration.upper() in logging_levels:
logging.basicConfig(level=options.logging_configuration.upper())
else:
with open(options.logging_configuration) as f:
from ZConfig import configureLoggers
configureLoggers(f.read())
transform = options.transform
if transform is not None:
from .component import global_by_name
transform = global_by_name(transform)
jsonifier = Jsonifier(transform=transform)
driver = relstorage.adapters.postgresql.select_driver(
relstorage.options.Options(driver=options.driver))
Binary = driver.Binary
dsn = options.connection_string
with closing(pg_connection(dsn)) as conn:
with closing(conn.cursor()) as cursor:
if options.nagios:
if not table_exists(cursor, 'newt_follow_progress'):
print("Updater has not run")
return 2
cursor.execute("select max(tid) from object_state")
[[stid]] = cursor
utid = follow.get_progress_tid(conn, __name__)
if stid is None:
if utid == -1:
print("No transactions")
return 0
else:
def storage(dsn, keep_history=False, transform=None, auxiliary_tables=(), **kw):
"""Create a RelStorage storage using the newt PostgresQL adapter.
Keyword options can be used to provide either `ZODB.DB
`_
options or `RelStorage
`_
options.
"""
options = relstorage.options.Options(keep_history=keep_history, **kw)
options.transform = transform
options.auxiliary_tables = auxiliary_tables
return relstorage.storage.RelStorage(Adapter(dsn, options), options=options)
def pg_connection(dsn, driver_name='auto'):
"""Create a PostgreSQL (not newt) database connection
This function should be used rather than, for example, calling
``psycopg2.connect``, because it can use other Postgres drivers
depending on the Python environment and available modules.
"""
options = relstorage.options.Options(driver=driver_name)
driver = relstorage.adapters.postgresql.select_driver(options)
return driver.connect(dsn)