Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _connect(self):
from sqlalchemy import create_engine, MetaData
if self.driver == 'pyhive':
try:
import pyhive.hive
except ImportError:
raise ImportError("""
Omniduct is attempting to use the 'pyhive' driver, but it
is not installed. Please either install the pyhive package,
or reconfigure this Duct to use the 'impyla' driver.
""")
self.__hive = pyhive.hive.connect(host=self.host,
port=self.port,
auth=self.auth_mechanism,
database=self.schema,
username=self.username,
password=self.password,
**self.connection_options)
self._sqlalchemy_engine = create_engine('hive://{}:{}/{}'.format(self.host, self.port, self.schema))
self._sqlalchemy_metadata = MetaData(self._sqlalchemy_engine)
elif self.driver == 'impyla':
try:
import impala.dbapi
except ImportError:
raise ImportError("""
Omniduct is attempting to use the 'impyla' driver, but it
is not installed. Please either install the impyla package,
or reconfigure this Duct to use the 'pyhive' driver.
import puretransport
from pyhive import hive
transport = puretransport.transport_factory(host='host',
port=10000,
username='username',
password='secret',
use_ssl=True)
hive_con = hive.connect(username='username', thrift_transport=transport,
database='default')
cursor = hive_con.cursor()
cursor.execute("select * from table limit 1")
cursor.fetchall()
from flask import Flask, request
import os
import json
from pyhive import hive
cursor = hive.connect('localhost').cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS employee ( eid int, name String, salary String, destignation String) COMMENT 'employee details' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE"
)
cursor.execute(
"LOAD DATA LOCAL INPATH 'sample.txt' OVERWRITE INTO TABLE employee"
)
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hey, we have Flask in a Docker container!'
@app.route('/employee//')
def get_connection(self, host, db='DEFAULT', queue='default'):
return hive.connect(host=host,
database=db,
configuration={'mapred.job.queue.name': queue,
' hive.exec.dynamic.partition.mode': 'nonstrict'})
def run_query(self, query, user):
connection = None
try:
connection = hive.connect(**self.configuration.to_dict())
cursor = connection.cursor()
cursor.execute(query)
column_names = []
columns = []
for column in cursor.description:
column_name = column[COLUMN_NAME]
column_names.append(column_name)
columns.append({
'name': column_name,
'friendly_name': column_name,
'type': types_map.get(column[COLUMN_TYPE], None)
port = ":" + str(port)
http_uri = "{}://{}{}{}".format(scheme, host, port, path)
# create transport
transport = THttpClient.THttpClient(http_uri)
# if username or password is set, add Authorization header
username = self.configuration.get("username", "")
password = self.configuration.get("http_password", "")
if username or password:
auth = base64.b64encode(username + ":" + password)
transport.setCustomHeaders({"Authorization": "Basic " + auth})
# create connection
connection = hive.connect(thrift_transport=transport)
return connection
kerberos_service_name = None
if conf.get('core', 'security') == 'kerberos':
auth_mechanism = db.extra_dejson.get('authMechanism', 'KERBEROS')
kerberos_service_name = db.extra_dejson.get('kerberos_service_name', 'hive')
# pyhive uses GSSAPI instead of KERBEROS as a auth_mechanism identifier
if auth_mechanism == 'GSSAPI':
self.log.warning(
"Detected deprecated 'GSSAPI' for authMechanism "
"for %s. Please use 'KERBEROS' instead",
self.hiveserver2_conn_id
)
auth_mechanism = 'KERBEROS'
from pyhive.hive import connect
return connect(
host=db.host,
port=db.port,
auth=auth_mechanism,
kerberos_service_name=kerberos_service_name,
username=db.login or username,
password=db.password,
database=schema or db.schema or 'default')
def get_connection(self):
"""Gets connection to the Hive server.
Returns:
pyhive.hive.Connection: Hive Connection object.
"""
logger.debug("Getting Hive Connection")
try:
connection = hive.connect(host=self.host, port=self.port,
username=self.user)
return connection
except TTransport.TTransportException as error:
logger.error("Failed to establish Hive connection")
raise custom_exceptions.ConnectionError from error