Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _log_status(self, cursor, log_offset=0):
matcher = re.compile('[0-9/]+ [0-9:]+ (INFO )?')
if self.driver == 'pyhive':
log = cursor.fetch_logs()
else:
log = cursor.get_log().strip().split('\n')
for line in log[log_offset:]:
if not line:
continue
m = matcher.match(line)
if m:
line = line[len(m.group(0)):]
logger.info(line)
return len(log)
def get_free_local_port():
"""
Return a random free port
Returns
-------
free_port : int
A free local port
"""
s = socket.socket()
s.bind(("", 0))
free_port = s.getsockname()[1]
s.close()
logger.info('found port {0}'.format(free_port))
return free_port
def _disconnect(self):
logger.info('Disconnecting from Neo4J graph database ...')
try:
self.__driver.close()
except Exception:
pass
self.__driver = None
def _port_forward_start(self, local_port, remote_host, remote_port):
logger.info('Establishing port forward...')
cmd_template = 'ssh {login} -T -O forward -S {socket} -L localhost:{local_port}:{remote_host}:{remote_port}'
cmd = cmd_template.format(login=self._login_info,
socket=self._socket_path,
local_port=local_port,
remote_host=remote_host,
remote_port=remote_port)
proc = run_in_subprocess(cmd)
if proc.returncode != 0:
raise Exception('Unable to port forward with command: {}'.format(cmd))
logger.info(proc.stderr or 'Success')
return proc
def __register_implementation__(cls):
if not hasattr(cls, '_protocols'):
cls._protocols = {}
cls._protocols[cls.__name__] = cls
registry_keys = getattr(cls, 'PROTOCOLS', []) or []
if registry_keys:
for key in registry_keys:
if key in cls._protocols and cls.__name__ != cls._protocols[key].__name__:
logger.info("Ignoring attempt by class `{}` to register key '{}', which is already registered for class `{}`.".format(cls.__name__, key, cls._protocols[key].__name__))
else:
cls._protocols[key] = cls
# Try using Hive CLI
# If `partition` is specified, the associated columns must not be
# present in the dataframe.
assert len(set(partition).intersection(df.columns)) == 0, "The dataframe to be uploaded must not have any partitioned fields. Please remove the field(s): {}.".format(','.join(set(partition).intersection(df.columns)))
# Save dataframe to file and send it to the remote server if necessary
temp_dir = tempfile.mkdtemp(prefix='omniduct_hiveserver2')
tmp_fname = os.path.join(temp_dir, 'data_{}.csv'.format(time.time()))
logger.info('Saving dataframe to file... {}'.format(tmp_fname))
df.fillna(r'\N').to_csv(tmp_fname, index=False, header=False,
sep=sep, encoding='utf-8')
if self.remote:
logger.info("Uploading data to remote host...")
self.remote.upload(tmp_fname)
# Generate create table statement.
auto_table_props = set(self.default_table_props).difference(table_props)
if len(auto_table_props) > 0:
logger.warning(
"In addition to any specified table properties, this "
"HiveServer2Client has added the following default table "
"properties:\n{default_props}\nTo override them, please "
"specify overrides using: `.push(..., table_props={{...}}).`"
.format(default_props=json.dumps({
prop: value for prop, value in self.default_table_props.items()
if prop in auto_table_props
}, indent=True))
)
def _disconnect(self):
logger.info('Disconnecting from Druid database ...')
try:
self.__druid.close()
except Exception:
pass
self.__druid = None
"""
from ..filesystems.local import LocalFsClient
if fs is None or isinstance(fs, LocalFsClient):
logger.info('Copying file from local...')
dest = dest or posixpath.basename(source)
cmd = (
"scp -r -o ControlPath={socket} '{local_file}' {login}:'{remote_file}'".format(
socket=self._socket_path,
local_file=source.replace('"', r'\"'), # quote escaped for bash
login=self._login_info,
remote_file=dest.replace('"', r'\"'),
)
)
proc = run_in_subprocess(cmd, check_output=True)
logger.info(proc.stderr or 'Success')
else:
return super(RemoteClient, self).upload(source, dest, overwrite, fs)