Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _connect(self):
from pydruid.db import connect
logger.info('Connecting to Druid database ...')
self.__druid = connect(self.host, self.port, path='/druid/v2/sql/', scheme='http')
if self.username or self.password:
logger.warning(
'Duct username and password not passed to pydruid connection. '
'pydruid connection currently does not allow these fields to be passed.'
def get_conn(self):
"""
Establish a connection to druid broker.
"""
conn = self.get_connection(self.druid_broker_conn_id)
druid_broker_conn = connect(
host=conn.host,
port=conn.port,
path=conn.extra_dejson.get('endpoint', '/druid/v2/sql'),
scheme=conn.extra_dejson.get('schema', 'http'),
user=conn.login,
password=conn.password
)
self.log.info('Get the connection to druid broker on %s using user %s', conn.host, conn.login)
return druid_broker_conn
def run_query(self, query, user):
connection = connect(host=self.configuration['host'],
port=self.configuration['port'],
path='/druid/v2/sql/',
scheme=(self.configuration.get('scheme') or 'http'),
user=(self.configuration.get('user') or None),
password=(self.configuration.get('password') or None))
cursor = connection.cursor()
try:
cursor.execute(query)
columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data)