Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
hdfs_user = cfg['hdfs']['hdfs_user']
hdfs_group = cfg['hdfs']['hdfs_group']
mount_dest_dir = cfg['mount']['dest_dir']
if 'extra' in cfg['mount']:
mount_extra_params = cfg['mount']['extra']
else:
mount_extra_params = {}
if not os.path.isdir(mount_dest_dir):
print('Directory {0} does not exists, please specify an existing directory.'.format(mount_dest_dir))
exit(1)
if cfg['hdfs']['kerberos']:
hdfs_client = KerberosClient(hdfs_server)
else:
hdfs_client = Client(hdfs_server)
operations = HDFS(hdfs_client, hdfs_mount_root, hdfs_user, hdfs_group)
FUSE(operations, mountpoint=mount_dest_dir, raw_fi=False, foreground=True, **mount_extra_params)
@property
def hdfs_client(self):
return hdfs.client.Client(self.namenode_url, root=self.root_dir)
containing module. If using the CLI, you can use the `autoload.modules` and
`autoload.paths` options.
"""
try:
return cls.__registry__[class_name](**options)
except KeyError:
raise HdfsError('Unknown client class: %r', class_name)
except TypeError:
raise HdfsError('Invalid options: %r', options)
# Custom client classes
# ---------------------
class InsecureClient(Client):
"""HDFS web client to use when security is off.
:param url: Hostname or IP address of HDFS namenode, prefixed with protocol,
followed by WebHDFS port on namenode
:param user: User default. Defaults to the current user's (as determined by
`whoami`).
:param \*\*kwargs: Keyword arguments passed to the base class' constructor.
Note that if a session argument is passed in, it will be modified in-place to
support authentication.
"""
def __init__(self, url, user=None, **kwargs):
user = user or getuser()
Note that if a session argument is passed in, it will be modified in-place to
support authentication.
"""
def __init__(self, url, user=None, **kwargs):
user = user or getuser()
session = kwargs.setdefault('session', rq.Session())
if not session.params:
session.params = {}
session.params['user.name'] = user
super(InsecureClient, self).__init__(url, **kwargs)
class TokenClient(Client):
"""HDFS web client using Hadoop token delegation security.
:param url: Hostname or IP address of HDFS namenode, prefixed with protocol,
followed by WebHDFS port on namenode
:param token: Hadoop delegation token.
:param \*\*kwargs: Keyword arguments passed to the base class' constructor.
Note that if a session argument is passed in, it will be modified in-place to
support authentication.
"""
def __init__(self, url, token, **kwargs):
session = kwargs.setdefault('session', rq.Session())
if not session.params:
self._lock = Lock()
self._sem = Semaphore(max_concurrency)
self._timestamp = time() - self._delay
super(_HdfsHTTPKerberosAuth, self).__init__(**kwargs)
def __call__(self, req):
with self._sem:
with self._lock:
delay = self._timestamp + self._delay - time()
if delay > 0:
sleep(delay) # Avoid replay errors.
self._timestamp = time()
return super(_HdfsHTTPKerberosAuth, self).__call__(req)
class KerberosClient(Client):
"""HDFS web client using Kerberos authentication.
:param url: Hostname or IP address of HDFS namenode, prefixed with protocol,
followed by WebHDFS port on namenode.
:param mutual_auth: Whether to enforce mutual authentication or not (possible
values: `'REQUIRED'`, `'OPTIONAL'`, `'DISABLED'`).
:param max_concurrency: Maximum number of allowed concurrent authentication
requests. This is required since requests exceeding the threshold allowed
by the server will be unable to authenticate.
:param proxy: User to proxy as.
:param root: Root path, this will be prefixed to all HDFS paths passed to the
client. If the root is relative, the path will be assumed relative to the
user's home directory.
:param timeout: Connection timeouts, forwarded to the request handler. How
long to wait for the server to send data before giving up, as a float, or a