Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
将一段内存变量映射为一个P类型实例
Args:
dataset (object): 任意类型的内存变量
options:
serde: 设置dataset的serde对象
Returns:
PType: 表示该内存变量的P类型
"""
objector = options.get("serde", self.default_objector())
local_input_path = "./.local_input"
if os.path.isfile(local_input_path):
raise error.BigflowPlanningException("file ./.local_input exist, "
"cannot use it as temp directory")
if not os.path.exists(local_input_path):
os.makedirs(local_input_path)
file_name = os.path.abspath(local_input_path + "/" + str(uuid.uuid4()))
requests.write_record(file_name,
utils.flatten_runtime_value(dataset),
objector)
self._local_temp_files.append(file_name)
node = self.read(input.SequenceFile(file_name, **options)).node()
nested_level, ptype = utils.detect_ptype(dataset)
if nested_level < 0:
""" 注册一个在 pipeline.run() 执行之后的 hook.
hook 执行顺序: 注册的 name 进行 sorted 排序结果
todo: deal with callback with parameters. Users can always use closure to convert a callback
with parameters to a zero-parameter callback
:param name: 钩子名称
:param callback: 无参的 callback
:return: None
..Note: This function is provided for advanced usage, please make sure you know what you are
doing.
"""
if callable(callback):
self._after_run_hooks[name] = (callback, )
else:
raise error.BigflowPlanningException("Cannot register a non-callable object: %s" %
str(callback))