Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
coltype = type_map[row.Type]
except KeyError:
logger.warn("Did not recognize type '%s' of column '%s'" % (row.Type, row.Column))
coltype = sql_types.NullType
result.append({
'name': row.Column,
'type': coltype,
# newer Presto no longer includes this column
'nullable': getattr(row, 'Null', True),
'default': None,
})
return result
PrestoDialect.get_columns = get_columns
except ImportError:
logger.debug("Not monkey patching pyhive's PrestoDialect.get_columns due to missing dependencies.")
class SchemasMixin(object):
"""
Attaches a tab-completable `.schemas` attribute to a `DatabaseClient` instance.
It is currently implemented as a mixin rather than directly provided on the
base class because it requires that the host `DatabaseClient` instance have a
`sqlalchemy` metadata object handle, and not all backends support this.
If we are willing to forgo the ability to actually make queries using the
SQLAlchemy ORM, we could instead use an SQL agnostic version.
"""
@property
@require_connection
def _port_forward_start(self, local_port, remote_host, remote_port):
logger.debug('Now forwarding port {} to {}:{} ...'.format(local_port, remote_host, remote_port))
try:
server = forward_tunnel(local_port, remote_host, remote_port, self.__client.get_transport())
except KeyboardInterrupt:
print('C-c: Port forwarding stopped.')
return server
table=table,
partition_clause=partition_clause
)
# Run create table statement and load data statments
logger.info(
"Creating hive table `{table}` if it does not "
"already exist, and inserting the provided data{partition}."
.format(
table=table,
partition=" into {}".format(partition_clause) if partition_clause else ""
)
)
try:
stmts = '\n'.join([cts, lds])
logger.debug(stmts)
proc = self._run_in_hivecli(stmts)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode('utf-8'))
finally:
# Clean up files
if self.remote:
self.remote.execute('rm -rf {}'.format(tmp_fname))
shutil.rmtree(temp_dir, ignore_errors=True)
logger.info("Successfully uploaded dataframe {partition}`{table}`.".format(
table=table,
partition="into {} of ".format(partition_clause) if partition_clause else ""
))
Execute command using default subprocess configuration.
Parameters
----------
cmd : string
Command to be executed in subprocess.
kwargs : keywords
Options to pass to subprocess.Popen.
Returns
-------
proc : Popen subprocess
Subprocess used to run command.
"""
logger.debug('Executing command: {0}'.format(cmd))
config = DEFAULT_SUBPROCESS_CONFIG.copy()
config.update(kwargs)
if not check_output:
if omniduct_config.logging_level < 20:
config['stdout'] = None
config['stderr'] = None
else:
config['stdout'] = open(os.devnull, 'w')
config['stderr'] = open(os.devnull, 'w')
timeout = config.pop('timeout', None)
process = subprocess.Popen(cmd, **config)
try:
stdout, stderr = process.communicate(None, timeout=timeout)
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(process.pid), signal.SIGINT) # send signal to the process group, recursively killing all children
def get(self, id_duct, id_str, deserializer=pickle.load):
cache_path = self.get_path(id_duct, id_str)
if not os.path.exists(cache_path):
return None
with open(cache_path, 'rb') as f:
logger.debug("Loading local cache entry from '{}'.".format(cache_path))
return deserializer(f)