Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def RelStorageConfigurationFactory(key, dbconfig):
if not RELSTORAGE:
raise Exception("You must install the relstorage package before you can use "
"it as a dabase adapter.")
config = dbconfig.get('configuration', {})
options = Options(**dbconfig['options'])
if dbconfig['type'] == 'postgres':
from relstorage.adapters.postgresql import PostgreSQLAdapter
dsn = "dbname={dbname} user={user} host={host} password={password} port={port}".format(**dbconfig['dsn']) # noqa
adapter = PostgreSQLAdapter(dsn=dsn, options=options)
rs = RelStorage(adapter=adapter, options=options)
db = DB(rs)
try:
conn = db.open()
rootobj = conn.root()
if not IDatabase.providedBy(rootobj):
alsoProvides(rootobj, IDatabase)
transaction.commit()
except: # noqa
pass
finally:
rootobj = None
conn.close()
db.close()
rs = RelStorage(adapter=adapter, options=options)
db = RequestAwareDB(rs, **config)
def open(self, databases=None):
db = self.config.db
db.name = db.name or self.name
db = db.open(databases)
assert (
isinstance(db.storage,
relstorage.storage.RelStorage)
and
isinstance(db.storage._adapter,
relstorage.adapters.postgresql.PostgreSQLAdapter)
), "Invalid storage"
from ._db import NewtDB
return NewtDB(db)
import relstorage.adapters.postgresql
import relstorage.adapters.postgresql.mover
import relstorage.adapters.postgresql.schema
from .jsonpickle import Jsonifier
from ._util import trigger_exists
class Adapter(relstorage.adapters.postgresql.PostgreSQLAdapter):
def __init__(self, *args, **kw):
super(Adapter, self).__init__(*args, **kw)
driver = relstorage.adapters.postgresql.select_driver(self.options)
self.schema = SchemaInstaller(
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
keep_history=self.keep_history,
)
self.mover = Mover(
database_type='postgresql',
options=self.options,
runner=self.runner,
version_detector=self.version_detector,
def factory(options):
return PostgreSQLAdapter(dsn=dsn, options=options)
return factory, unused