Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Prevent race condition when using multiple threads.
if not osp.exists(_dpath):
os.makedirs(_dpath)
with open(_temp_path, 'wb') as _writer:
with self.read(_remote_path, **kwargs) as reader:
for chunk in reader:
_writer.write(chunk)
# First, we figure out where we will download the files to.
hdfs_path = self.resolve(hdfs_path)
local_path = osp.realpath(local_path)
if osp.isdir(local_path):
local_path = osp.join(local_path, psp.basename(hdfs_path))
if osp.exists(local_path):
if not overwrite:
raise HdfsError('Path %r already exists.', local_path)
local_dpath, local_name = osp.split(local_path)
temp_dir = temp_dir or local_dpath
temp_path = osp.join(
temp_dir,
'%s.temp-%s' % (local_name, _current_micros())
)
_logger.debug(
'Download destination %r already exists. Using temporary path %r.',
local_path, temp_path
)
else:
if not osp.isdir(osp.dirname(local_path)):
raise HdfsError('Parent directory of %r does not exist.', local_path)
temp_path = local_path
# Then we figure out which files we need to download and where.
remote_paths = list(self.walk(hdfs_path, depth=0, status=False))
if osp.exists(local_path):
if not overwrite:
raise HdfsError('Path %r already exists.', local_path)
local_dpath, local_name = osp.split(local_path)
temp_dir = temp_dir or local_dpath
temp_path = osp.join(
temp_dir,
'%s.temp-%s' % (local_name, _current_micros())
)
_logger.debug(
'Download destination %r already exists. Using temporary path %r.',
local_path, temp_path
)
else:
if not osp.isdir(osp.dirname(local_path)):
raise HdfsError('Parent directory of %r does not exist.', local_path)
temp_path = local_path
# Then we figure out which files we need to download and where.
remote_paths = list(self.walk(hdfs_path, depth=0, status=False))
if not remote_paths:
# This is a single file.
remote_fpaths = [hdfs_path]
else:
remote_fpaths = [
psp.join(dpath, fname)
for dpath, _, fnames in remote_paths
for fname in fnames
]
if not remote_fpaths:
raise HdfsError('No files to download found inside %r.', hdfs_path)
offset = len(hdfs_path) + 1 # Prefix length.
fpath_tuples = [
if verify == 'true':
self._logger.info('SSL verification enabled')
session.verify = True
if cert is not None:
self._logger.info('SSL Cert: ' + cert)
if ',' in cert:
session.cert = [path.strip() for path in cert.split(',')]
else:
session.cert = cert
elif verify == 'false':
session.verify = False
super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
class HdfsException(HdfsError):
def __init__(self, message):
super(HdfsException, self).__init__(message)
self.message = message
def get_client(user=None):
# type: (object) -> Client
logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)}
if Config.ssl_enabled():
ssl_verify, ca_location, cert, key = Config.ssl()
conf.update({'verify': ssl_verify.lower()})
if cert:
def exists(self, path):
"""
Returns true if the path exists and false otherwise.
"""
import hdfs
try:
self.client.status(path)
return True
except hdfs.util.HdfsError as e:
if str(e).startswith('File does not exist: '):
return False
else:
raise e
def _to_error(response):
"""Callback when an API response has a non 2XX status code.
:param response: Response.
"""
if response.status_code == 401:
_logger.error(response.content)
raise HdfsError('Authentication failure. Check your credentials.')
try:
# Cf. http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#Error+Responses
message = response.json()['RemoteException']['message']
except ValueError:
# No clear one thing to display, display entire message content
message = response.content
try:
exception = response.json()['RemoteException']['exception']
except ValueError:
exception = None
return HdfsError(message, exception=exception)
@catch(HdfsError)
def main(argv=None, client=None, stdin=sys.stdin, stdout=sys.stdout):
"""Entry point.
:param argv: Arguments list.
:param client: For testing.
"""
args = docopt(__doc__, argv=argv)
if not client:
client = configure_client('hdfscli-avro', args)
elif args['--log']:
raise HdfsError('Logging is only available when no client is specified.')
overwrite = args['--force']
parts = parse_arg(args, '--parts', int, ',')
if args['write']:
writer = AvroWriter(
def write(self, record):
"""Store a record.
:param record: Record object to store.
Only available inside the `with` block.
"""
if not self._entered:
raise HdfsError('Avro writer not available outside context block.')
if not self._schema:
self._schema = _SchemaInferrer().infer(record)
_logger.info('Inferred schema: %s', dumps(self._schema))
self._start_writer()
self._writer.write(record)
dns_threat_investigation = "dns_threat_dendro/hive/oa/threat_dendro"
dns_timeline = "dns/hive/oa/threat_investigation"
app_path = Configuration.spot()
try:
# remove parquet files manually to allow the comments update.
HDFSClient.delete_folder("{0}/{1}/y={2}/m={3}/d={4}/".format( \
app_path,dns_storyboard,date.year,date.month,date.day) , "impala")
HDFSClient.delete_folder("{0}/{1}/y={2}/m={3}/d={4}/".format( \
app_path,dns_threat_investigation,date.year,date.month,date.day), "impala")
HDFSClient.delete_folder("{0}/{1}/y={2}/m={3}/d={4}/".format( \
app_path,dns_timeline,date.year,date.month,date.day), "impala")
ImpalaEngine.execute_query("invalidate metadata")
return True
except HdfsError:
return False
def rename(self, hdfs_src_path, hdfs_dst_path):
"""Move a file or folder.
:param hdfs_src_path: Source path.
:param hdfs_dst_path: Destination path. If the path already exists and is
a directory, the source will be moved into it. If the path exists and is
a file, or if a parent destination directory is missing, this method will
raise an :class:`HdfsError`.
"""
_logger.info('Renaming %r to %r.', hdfs_src_path, hdfs_dst_path)
hdfs_dst_path = self.resolve(hdfs_dst_path)
res = self._rename(hdfs_src_path, destination=hdfs_dst_path)
if not res.json()['boolean']:
raise HdfsError(
'Unable to rename %r to %r.',
self.resolve(hdfs_src_path), hdfs_dst_path
)
@catch(HdfsError)
def main(argv=None, client=None):
"""Entry point.
:param argv: Arguments list.
:param client: For testing.
"""
args = docopt(__doc__, argv=argv, version=__version__)
if not client:
client = configure_client('hdfscli', args)
elif args['--log']:
raise HdfsError('Logging is only available when no client is specified.')
hdfs_path = args['HDFS_PATH']
local_path = args['LOCAL_PATH']
n_threads = parse_arg(args, '--threads', int)
force = args['--force']