Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param python_file: The python file which is going to be run
:param func: The user-defined function that runs TF inference. If it's None, inference is run via Java API.
:param properties: User-defined properties
:param env_path: Path to the virtual env
:param zk_conn: The Zookeeper connection string
:param zk_base_path: The Zookeeper base path
:param stream_env: The StreamExecutionEnvironment. If it's None, this method will create one and execute the job
at the end. Otherwise, caller is responsible to trigger the job execution
:param input_ds: The input DataStream
:param output_row_type: The RowType for the output DataStream. If it's None, a dummy sink will be added to the
output DataStream. Otherwise, caller is responsible to add sink before executing the job.
:return: The output DataStream. Currently it's always of type Row.
"""
tf_config = TFConfig(num_worker, num_ps, python_file, func, properties, env_path, zk_conn, zk_base_path)
if stream_env is None:
stream_env = StreamExecutionEnvironment.get_execution_environment()
if input_ds is not None:
if isinstance(input_ds, DataStreamSource):
input_ds = input_ds._j_datastream_source
else:
input_ds = input_ds._j_datastream
output_ds = get_gateway().jvm.com.alibaba.flink.ml.tensorflow.client.TFUtils.inference(stream_env._j_stream_execution_environment,
input_ds,
tf_config.java_config(),
to_java_type_info(output_row_type))
stream_env.execute()
return DataStream(output_ds)
def addTrainChiefAloneTable():
stream_env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_env)
work_num = 2
ps_num = 1
python_file = os.getcwd() + "/../../src/test/python/add.py"
func = "map_func"
property = {}
property[TFCONSTANS.TF_IS_CHIEF_ALONE] = "ture"
env_path = None
zk_conn = None
zk_base_path = None
input_tb = None
output_schema = None
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
def addTrainTable():
stream_env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_env)
work_num = 2
ps_num = 1
python_file = os.getcwd() + "/../../src/test/python/add.py"
func = "map_func"
property = None
env_path = None
zk_conn = None
zk_base_path = None
input_tb = None
output_schema = None
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
:param python_file: The python file which is going to be run
:param func: The user-defined function that runs TF training
:param properties: User-defined properties
:param env_path: Path to the virtual env
:param zk_conn: The Zookeeper connection string
:param zk_base_path: The Zookeeper base path
:param stream_env: The StreamExecutionEnvironment. If it's None, this method will create one and execute the job
at the end. Otherwise, caller is responsible to trigger the job execution
:param input_ds: The input DataStream
:param output_row_type: The RowType for the output DataStream. If it's None, a dummy sink will be added to the
output DataStream. Otherwise, caller is responsible to add sink before executing the job.
:return: The output DataStream. Currently it's always of type Row.
"""
tf_config = TFConfig(num_worker, num_ps, python_file, func, properties, env_path, zk_conn, zk_base_path)
if stream_env is None:
stream_env = StreamExecutionEnvironment.get_execution_environment()
if input_ds is not None:
if isinstance(input_ds, DataStreamSource):
input_ds = input_ds._j_datastream_source
else:
input_ds = input_ds._j_datastream
output_ds = get_gateway().jvm.com.alibaba.flink.ml.tensorflow.client.TFUtils.train(stream_env._j_stream_execution_environment,
input_ds,
tf_config.java_config(),
to_java_type_info(output_row_type))
stream_env.execute()
return DataStream(output_ds)