Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(PyFlinkBlinkStreamTableTestCase, self).setUp()
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.t_env = StreamTableEnvironment.create(
self.env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
def testWorkerZeroFinish():
stream_env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_env)
work_num = 3
ps_num = 2
python_file = os.getcwd() + "/../../src/test/python/worker_0_finish.py"
func = "map_func"
property = None
env_path = None
zk_conn = None
zk_base_path = None
input_tb = None
output_schema = None
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.types import DataTypes
content = (
'line Licensed to the Apache Software Foundation ASF under one '
'line or more contributor license agreements See the NOTICE file '
'line distributed with this work for additional information '
'line regarding copyright ownership The ASF licenses this file '
'to you under the Apache License Version the '
'License you may not use this file except in compliance '
'with the License'
)
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings = EnvironmentSettings.new_instance()
.use_blink_planner()
.build(),
)
result_path = '/notebooks/output.csv'
print('Results directory:', result_path)
t_env.connect(FileSystem().path(result_path)).with_format(
OldCsv()
.field_delimiter(',')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())
).with_schema(
def addTrainTable():
stream_env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_env)
work_num = 2
ps_num = 1
python_file = os.getcwd() + "/../../src/test/python/add.py"
func = "map_func"
property = None
env_path = None
zk_conn = None
zk_base_path = None
input_tb = None
output_schema = None
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,