Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(repos, revision):
"""
Main function.
"""
import pysvn
import os.path
client = pysvn.Client()
diff = client.diff_summarize(repos,
revision1=pysvn.Revision(pysvn.opt_revision_kind.number, revision - 1),
revision2=pysvn.Revision(pysvn.opt_revision_kind.number, revision))
conn = sqlobject.connectionForURI(DATABASE_URI)
sqlobject.sqlhub.processConnection = conn
#PythonScore.createTable()
func = lambda f: os.path.splitext(f.path)[-1] == ".py"
for entry in filter(func, diff):
path = os.path.join(repos, entry.path)
score, old_score, credit = process_file(path)
info = client.info(path)
PythonScore(username=info['commit_author'], pathname=path, revision="1",
score=score, old_score=old_score, credit=credit)
def getConnection(self):
try:
conn = self.threadingLocal.connection
return conn
except AttributeError:
if self.uri:
conn = sqlobject.connectionForURI(self.uri)
# the following line effectively turns off the DBAPI connection
# cache. We're already holding on to a connection per thread,
# and the cache causes problems with sqlite.
if self.uri.startswith("sqlite"):
TheURIOpener.cachedURIs = {}
self.threadingLocal.connection = conn
if not self.pool_connections:
# This disables pooling
conn._pool = None
return conn
try:
return self.processConnection
except AttributeError:
raise AttributeError(
"No connection has been defined for this thread "
"or process")
def connect(opts):
moduleLogVerbose.info("Connecting to db at %s" % opts.dbpath)
sqlobject.sqlhub.processConnection = sqlobject.connectionForURI('sqlite://%s' % opts.dbpath)
def connection(self):
config = self.config()
if config is not None:
assert config.get('database'), (
"No database variable found in config file %s"
% self.options.config_file)
return sqlobject.connectionForURI(config['database'])
elif getattr(self.options, 'connection_uri', None):
return sqlobject.connectionForURI(self.options.connection_uri)
else:
return None
def getConnection(self):
try:
conn = self.threadingLocal.connection
return conn
except AttributeError:
if self.uri:
conn = sqlobject.connectionForURI(self.uri)
# the following line effectively turns off the DBAPI connection
# cache. We're already holding on to a connection per thread,
# and the cache causes problems with sqlite.
if self.uri.startswith("sqlite"):
TheURIOpener.cachedURIs = {}
self.threadingLocal.connection = conn
if not self.pool_connections:
# This disables pooling
conn._pool = None
return conn
try:
return self.processConnection
except AttributeError:
raise AttributeError(
"No connection has been defined for this thread "
"or process")
self.runner.invalid(
self.min_args_error % {'min_args': self.min_args,
'actual_args': len(self.args)})
if self.max_args is not None and len(self.args) > self.max_args:
self.runner.invalid(
self.max_args_error % {'max_args': self.max_args,
'actual_args': len(self.args)})
for var_name, option_name in self.required_args:
if not getattr(self.options, var_name, None):
self.runner.invalid(
'You must provide the option %s' % option_name)
conf = self.config()
if conf and conf.get('sys_path'):
update_sys_path(conf['sys_path'], self.options.verbose)
if conf and conf.get('database'):
conn = sqlobject.connectionForURI(conf['database'])
sqlobject.sqlhub.processConnection = conn
for egg_spec in getattr(self.options, 'eggs', []):
self.load_options_from_egg(egg_spec)
self.command()
archivePoolShares()
archivePools()
archiveTaskNodes()
def groupForThread2():
archiveTasks()
archiveFolderNodes()
def groupForThread3():
archiveTaskGroups()
archiveCommands()
DB_URL = "mysql://root@127.0.0.1/pulidb"
STAT_DB_URL = "mysql://root@127.0.0.1/pulistatdb"
mainConn = connectionForURI(DB_URL)
statConn = connectionForURI(STAT_DB_URL)
threading.Thread(target=groupForThread1).start()
threading.Thread(target=groupForThread2).start()
threading.Thread(target=groupForThread3).start()
import os
from datetime import datetime
from decimal import Decimal
from sqlobject import (DatabaseIndex, DateTimeCol, IntCol, SQLObject, connectionForURI,
sqlhub, ForeignKey, RelatedJoin, MultipleJoin, FloatCol, UnicodeCol, DecimalCol, JSONCol)
dbtype = os.environ.get('DBTYPE', '')
if dbtype == 'postgres':
conn = sqlhub.processConnection = connectionForURI('postgres://postgres@localhost/tbench')
elif dbtype == 'mysql':
conn = sqlhub.processConnection = connectionForURI('mysql://root:@localhost/tbench')
else:
conn = sqlhub.processConnection = connectionForURI('sqlite:/dev/shm/db.sqlite3')
test = int(os.environ.get('TEST', '1'))
if test == 1:
class Journal(SQLObject):
timestamp = DateTimeCol(default=datetime.now, notNone=True)
level = IntCol(notNone=True)
level_index = DatabaseIndex('level')
text = UnicodeCol(length=255, notNone=True)
text_index = DatabaseIndex('text')
if test == 2:
class Journal(SQLObject):
timestamp = DateTimeCol(default=datetime.now, notNone=True)
level = IntCol(notNone=True)
level_index = DatabaseIndex('level')
def create_transaction(self):
"""Return a new transaction for connection"""
from sqlobject import connectionForURI
if not self.connection:
self.connection = connectionForURI(self.dsn)
self.close_conn = True # because we made it
if self.use_transaction:
return self.connection.transaction()
else:
return self.connection
def setup(databaseUrl=DATABASE_URL):
# Establish the connection
connection = connectionForURI(databaseUrl)
connection.dbEncoding="utf8"
# No pooling
connection._pool = None
sqlhub.processConnection = connection
# Creates tables
Ad.createTable(ifNotExists=True)
Report.createTable(ifNotExists=True)