Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_conn(self, mock_get_connections, mock_insecure_client):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
conn = self.webhdfs_hook.get_conn()
mock_insecure_client.assert_has_calls([
call('http://{host}:{port}'.format(host=connection.host, port=connection.port),
user=connection.login)
for connection in mock_get_connections.return_value
])
mock_insecure_client.return_value.status.assert_called_once_with('/')
self.assertEqual(conn, mock_insecure_client.return_value)
def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
raise BeamIOError(
'libhdfs error in renaming %s to %s' % (source, destination), e)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError('Rename operation failed', exceptions)
"""
Establishes a connection depending on the security mode set via config or environment variable.
:return: a hdfscli InsecureClient or KerberosClient object.
:rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
"""
connections = self.get_connections(self.webhdfs_conn_id)
for connection in connections:
try:
self.log.debug('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.debug('Using namenode %s for hook', connection.host)
return client
except HdfsError as hdfs_error:
self.log.debug('Read operation on namenode %s failed with error: %s',
connection.host, hdfs_error)
hosts = [connection.host for connection in connections]
error_message = 'Read operations failed on the namenodes below:\n{hosts}'.format(
hosts='\n'.join(hosts))
raise AirflowWebHDFSHookException(error_message)
Returns:
List of HDFS paths that satisfy the glob pattern.
"""
prefix = _parse_glob_pattern(pattern) or "/"
matched = set()
try:
for dpath, _, fnames in self.walk(prefix):
if fnmatch(dpath, pattern):
matched.add(dpath)
matched |= set(
"{}/{}".format(dpath, fname)
for fname in fnames
if fnmatch("{}/{}".format(dpath, fname), pattern)
)
except HdfsError: # pragma: no cover
# HdfsError is raised by `self.walk()` if prefix does not exist in HDFS.
# Ignore and return an empty list.
pass
return sorted(matched)
def utimens(self, path, times=None):
log.debug('utimens({}, {})'.format(path, times))
full_path = self._full_path(path)
at = int(times[0] * 1000)
mt = int(times[1] * 1000)
try:
self.hdfs_client.set_times(full_path, access_time=at, modification_time=mt)
except HdfsError as e:
if e.exception == 'IOException':
log.debug(e)
raise FuseOSError(errno.ENOSYS)
self._cache['last_cmd'] = 'utimens'
return os.utime(self._full_path(path), times)
def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
raise BeamIOError(
'libhdfs error in renaming %s to %s' % (source, destination), e)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError('Rename operation failed', exceptions)