Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(PyFlinkBlinkBatchTableTestCase, self).setUp()
self.t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance()
.in_batch_mode().use_blink_planner().build())
self.t_env._j_tenv.getPlanner().getExecEnv().setParallelism(2)
def setUp(self):
super(PyFlinkBlinkStreamTableTestCase, self).setUp()
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.t_env = StreamTableEnvironment.create(
self.env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
from pyflink.table.types import DataTypes
content = (
'line Licensed to the Apache Software Foundation ASF under one '
'line or more contributor license agreements See the NOTICE file '
'line distributed with this work for additional information '
'line regarding copyright ownership The ASF licenses this file '
'to you under the Apache License Version the '
'License you may not use this file except in compliance '
'with the License'
)
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings = EnvironmentSettings.new_instance()
.use_blink_planner()
.build(),
)
result_path = '/notebooks/output.csv'
print('Results directory:', result_path)
t_env.connect(FileSystem().path(result_path)).with_format(
OldCsv()
.field_delimiter(',')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())
).with_schema(
Schema()
.field('word', DataTypes.STRING())