Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if os.path.isfile(result_path):
os.remove(result_path)
else:
shutil.rmtree(result_path)
except OSError as e:
logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
logging.info("Results directory: %s", result_path)
t_env.connect(FileSystem().path(result_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("word", DataTypes.STRING())
.field("count", DataTypes.BIGINT())) \
.with_schema(Schema()
.field("word", DataTypes.STRING())
.field("count", DataTypes.BIGINT())) \
.register_table_sink("Results")
elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
.group_by("word") \
.select("word, count(1) as count") \
.insert_into("Results")
t_env.execute("word_count")
result_path = tmp_dir + '/result'
if os.path.exists(result_path):
try:
if os.path.isfile(result_path):
os.remove(result_path)
else:
shutil.rmtree(result_path)
except OSError as e:
logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
logging.info("Results directory: %s", result_path)
t_env.connect(FileSystem().path(result_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("word", DataTypes.STRING())
.field("count", DataTypes.BIGINT())) \
.with_schema(Schema()
.field("word", DataTypes.STRING())
.field("count", DataTypes.BIGINT())) \
.register_table_sink("Results")
elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
.group_by("word") \
.select("word, count(1) as count") \
.insert_into("Results")
t_env.execute("word_count")
source_file = os.getcwd() + "/../../src/test/resources/input.csv"
table_source = CsvTableSource(source_file,
["a", "b", "c", "d", "e"],
[DataTypes.INT(),
DataTypes.INT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.STRING()])
table_env.register_table_source("source", table_source)
input_tb = table_env.scan("source")
output_schema = TableSchema(["a", "b", "c", "d", "e"],
[DataTypes.INT(),
DataTypes.INT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.STRING()]
)
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
env_path = None
zk_conn = None
zk_base_path = None
property[MLCONSTANTS.ENCODING_CLASS] = "com.alibaba.flink.ml.operator.coding.RowCSVCoding"
property[MLCONSTANTS.DECODING_CLASS] = "com.alibaba.flink.ml.operator.coding.RowCSVCoding"
inputSb = "INT_32" + "," + "INT_64" + "," + "FLOAT_32" + "," + "FLOAT_64" + "," + "STRING"
property["SYS:csv_encode_types"] = inputSb
property["SYS:csv_decode_types"] = inputSb
source_file = os.getcwd() + "/../../src/test/resources/input.csv"
table_source = CsvTableSource(source_file,
["a", "b", "c", "d", "e"],
[DataTypes.INT(),
DataTypes.INT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.STRING()])
table_env.register_table_source("source", table_source)
input_tb = table_env.scan("source")
output_schema = TableSchema(["a", "b", "c", "d", "e"],
[DataTypes.INT(),
DataTypes.INT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.STRING()]
)
train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
input_tb, output_schema)
# inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,