Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_driver(url='neo4j', neo4j_auth='neo4j/neo4j'):
"""
Get the driver to the network connection using the bolt service
at the URI provided. If an authentication string is provided
(i.e. a typical NEO4J_AUTH value of /) then
authentication is assumed.
:return: the driver for the graph database
"""
# No auth on the database
from neo4j.v1 import GraphDatabase
auth_parts = neo4j_auth.split('/')
if len(auth_parts) == 2:
print('Getting driver for user "{}"...'.format(auth_parts[0]))
driver = GraphDatabase.driver('bolt://' + url + ':7687',
auth=(auth_parts[0], auth_parts[1]))
else:
driver = GraphDatabase.driver('bolt://' + url + ':7687')
return driver
import sys
import os
try:
import QueryNCBIeUtils
except ImportError:
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../kg-construction'))) # Go up one level and look for it
import QueryNCBIeUtils
import requests_cache
requests_cache.install_cache('orangeboard')
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../") # code directory
from RTXConfiguration import RTXConfiguration
RTXConfiguration = RTXConfiguration()
# Connection information for the neo4j server, populated with orangeboard
driver = GraphDatabase.driver(RTXConfiguration.bolt, auth=basic_auth("neo4j", "precisionmedicine"))
session = driver.session()
# Connection information for the ipython-cypher package
connection = "http://neo4j:precisionmedicine@" + RTXConfiguration.database
DEFAULT_CONFIGURABLE = {
"auto_limit": 0,
"style": 'DEFAULT',
"short_errors": True,
"data_contents": True,
"display_limit": 0,
"auto_pandas": False,
"auto_html": False,
"auto_networkx": False,
"rest": False,
"feedback": False, # turn off verbosity in ipython-cypher
"uri": connection,
def _connect(self):
from neo4j.v1 import GraphDatabase
logger.info('Connecting to Neo4J graph database ...')
auth = (self.username, self.password) if self.username else None
self.__driver = GraphDatabase.driver("bolt://{}:{}".format(self.host, self.port), auth=auth) # TODO: Add kerberos support
except ImportError:
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../kg-construction'))) # Go up one level and look for it
from NormGoogleDistance import NormGoogleDistance
QueryEBIOLS = QueryEBIOLS.QueryEBIOLS()
QueryNCBIeUtils = QueryNCBIeUtils.QueryNCBIeUtils()
requests_cache.install_cache('orangeboard')
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../") # code directory
from RTXConfiguration import RTXConfiguration
RTXConfiguration = RTXConfiguration()
# Connection information for the neo4j server, populated with orangeboard
driver = GraphDatabase.driver(RTXConfiguration.bolt, auth=basic_auth("neo4j", "precisionmedicine"))
session = driver.session()
# Connection information for the ipython-cypher package
connection = "http://neo4j:precisionmedicine@" + RTXConfiguration.database
DEFAULT_CONFIGURABLE = {
"auto_limit": 0,
"style": 'DEFAULT',
"short_errors": True,
"data_contents": True,
"display_limit": 0,
"auto_pandas": False,
"auto_html": False,
"auto_networkx": False,
"rest": False,
"feedback": False, # turn off verbosity in ipython-cypher
"uri": connection,
def neo4j_connect(self):
assert self.neo4j_url is not None
assert self.neo4j_user is not None
assert self.neo4j_password is not None
self.driver = neo4j.v1.GraphDatabase.driver(self.neo4j_url,
auth=(self.neo4j_user,
self.neo4j_password))
# def neo4j_shutdown(self):
def __init__(self, host, port, username=None, password=None, ssl=False, timeout=None):
bolt_uri = "bolt://{host}".format(host=host, port=port)
if username and password:
# todo: set auth header
pass
driver = GraphDatabase.driver(bolt_uri)
self.session = driver.session()
def __init__(self, bolt, user=None, password=None, debug=False):
"""
:param bolt: A string containing the bolt address of the neo4j instance you wish to upload to
:param user: A string containing the username for neo4j
:param password: A string containing the password for neo4j
"""
if user is None or password is None:
rtxConfig = RTXConfiguration()
if user is None:
user = rtxConfig.neo4j_username
if password is None:
password = rtxConfig.neo4j_password
self.debug = debug
# Connection information for the neo4j server, populated with orangeboard
self.driver = GraphDatabase.driver(bolt, auth=basic_auth(user, password))