Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_client(self, connection):
connection_str = 'http://{host}:{port}'.format(host=connection.host, port=connection.port)
if _kerberos_security_mode:
client = KerberosClient(connection_str)
else:
proxy_user = self.proxy_user or connection.login
client = InsecureClient(connection_str, user=proxy_user)
return client
def hdfs_client(self):
url = 'http://{nn_host}:{webhdfs_port}'.format(
nn_host=self._nn_host, webhdfs_port=self._webhdfs_port)
if self._kerberized:
from hdfs.ext.kerberos import KerberosClient
client = KerberosClient(url, mutual_auth='REQUIRED')
else:
from hdfs.client import InsecureClient
client = InsecureClient(url, user=self._hdfs_user)
return client
def client(self):
# A naive benchmark showed that 1000 existence checks took 2.5 secs
# when not recreating the client, and 4.0 secs when recreating it. So
# not urgent to memoize it. Note that it *might* be issues with process
# forking and whatnot (as the one in the snakebite client) if we
# memoize it too trivially.
if self.client_type == 'kerberos':
from hdfs.ext.kerberos import KerberosClient
return KerberosClient(url=self.url)
else:
import hdfs
return hdfs.InsecureClient(url=self.url, user=self.user)
else:
prefix = 'https' if use_https else 'http'
try:
import requests_kerberos # noqa: F401
except ImportError:
raise IbisError(
"Unable to import requests-kerberos, which is required for "
"Kerberos HDFS support. Install it by executing `pip install "
"requests-kerberos` or `pip install hdfs[kerberos]`."
)
from hdfs.ext.kerberos import KerberosClient
# note SSL
url = '{0}://{1}:{2}'.format(prefix, host, port)
kwds.setdefault('mutual_auth', 'OPTIONAL')
hdfs_client = KerberosClient(url, session=session, **kwds)
else:
if use_https == 'default':
prefix = 'http'
else:
prefix = 'https' if use_https else 'http'
from hdfs.client import InsecureClient
url = '{}://{}:{}'.format(prefix, host, port)
hdfs_client = InsecureClient(url, session=session, **kwds)
return WebHDFS(hdfs_client)
def __init__(self, hdfs_path, nbytes):
self._data = {}
self._lock = Lock()
self._hpath = hdfs_path
self._nbytes = nbytes
def __call__(self):
with self._lock:
if self._nbytes >= 0:
self._data[self._hpath] = self._nbytes
else:
stderr.write('%s\n' % (sum(self._data.values()), ))
class SecureKerberosClient(KerberosClient):
"""A new client subclass for handling HTTPS connections with Kerberos.
:param url: URL to namenode.
:param cert: Local certificate. See `requests` documentation for details
on how to use this.
:param verify: Whether to check the host's certificate. WARNING: non production use only
:param \*\*kwargs: Keyword arguments passed to the default `Client`
constructor.
"""
def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
session = Session()
def hdfs_connection(self, parsed_uri, user=None):
# uses the hostname in the uri, replaces port by configured port
client_uri = 'http://{}:{}'.format(parsed_uri.hostname, self.port)
user = user or self.user
if self.use_kerberos:
from hdfs.ext import kerberos as hdfs_client
return hdfs_client.KerberosClient(client_uri)
else:
from hdfs import client as hdfs_client
return hdfs_client.InsecureClient(client_uri, user=user)