Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read(self, source):
"""
将外部存储的数据映射为一个PCollection,并在运行时读取数据
Args:
source (Source): 表示外部存储的Source实例
Returns:
PCollection: 读取结果
"""
from bigflow import input
#from bigflow.io import ComplexInputFormat
if isinstance(source, input.UserInputBase):
source = input.user_define_format(source)
#elif isinstance(source, ComplexInputFormat):
# return source.load_by(self)
from bigflow.core import entity
input_format_entity = entity.Entity.of(entity.Entity.loader, source.input_format)
ugi = source.ugi if hasattr(source, "ugi") else None
uris = map(lambda uri: self._transform_uri(uri, input_format_entity.name, ugi),
source.uris)
def _sum(arr, b):
if not isinstance(b, list):
arr.append(b)
else:
arr.extend(b)
return arr
uris = reduce(_sum, uris, [])