Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
clean = []
for part in pattern.split("/"):
if any(char in part for char in special):
break
clean.append(part)
return "/".join(clean)
def _split_filepath(filepath: str) -> Tuple[str, str]:
split_ = filepath.split("://", 1)
if len(split_) == 2:
return split_[0] + "://", split_[1]
return "", split_[0]
class KedroHdfsInsecureClient(InsecureClient):
"""Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists``
and ``hdfs_glob`` methods required by ``SparkDataSet``"""
def hdfs_exists(self, hdfs_path: str) -> bool:
"""Determines whether given ``hdfs_path`` exists in HDFS.
Args:
hdfs_path: Path to check.
Returns:
True if ``hdfs_path`` exists in HDFS, False otherwise.
"""
return bool(self.status(hdfs_path, strict=False))
def hdfs_glob(self, pattern: str) -> List[str]:
"""Perform a glob search in HDFS using the provided pattern.
def get_hdfs_client():
return InsecureClient(get_hdfs_address(), root='/')
def _get_client(self, connection):
connection_str = 'http://{host}:{port}'.format(host=connection.host, port=connection.port)
if _kerberos_security_mode:
client = KerberosClient(connection_str)
else:
proxy_user = self.proxy_user or connection.login
client = InsecureClient(connection_str, user=proxy_user)
return client
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
if hdfs_host is None:
raise ValueError('hdfs_host is not set')
if hdfs_port is None:
raise ValueError('hdfs_port is not set')
if hdfs_user is None:
raise ValueError('hdfs_user is not set')
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)
def __init__(self, option=None):
if not option:
option = settings.HDFS_STORAGE_OPTIONS
host = option.get('HOST', 'default')
port = option.get('PORT', 50070)
user = option.get('USER', None)
url = 'http://{0}:{1}'.format(host, port)
self.client = InsecureClient(url=url, user=user)
def client(self):
# A naive benchmark showed that 1000 existence checks took 2.5 secs
# when not recreating the client, and 4.0 secs when recreating it. So
# not urgent to memoize it. Note that it *might* be issues with process
# forking and whatnot (as the one in the snakebite client) if we
# memoize it too trivially.
if self.client_type == 'kerberos':
from hdfs.ext.kerberos import KerberosClient
return KerberosClient(url=self.url)
else:
import hdfs
return hdfs.InsecureClient(url=self.url, user=self.user)
def hdfs_client():
# TODO: Configure from env
return InsecureClient('http://%s:50070' % master_ip(),
user=config.HDFS_USER)