Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# ArrayType
elif isinstance(data_type, ArrayType):
return Types.OBJECT_ARRAY(_to_java_type(data_type.element_type))
# MapType
elif isinstance(data_type, MapType):
return Types.MAP(_to_java_type(data_type.key_type), _to_java_type(data_type.value_type))
# MultisetType
elif isinstance(data_type, MultisetType):
return Types.MULTISET(_to_java_type(data_type.element_type))
# RowType
elif isinstance(data_type, RowType):
return Types.ROW(
to_jarray(gateway.jvm.String, data_type.field_names()),
to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(f.data_type) for f in data_type.fields]))
# UserDefinedType
elif isinstance(data_type, UserDefinedType):
if data_type.java_udt():
return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
gateway.jvm.Class.forName(data_type.java_udt()))
else:
return _to_java_type(data_type.sql_type())
else:
raise TypeError("Not supported type: %s" % data_type)
elif isinstance(data_type, ArrayType):
return Types.OBJECT_ARRAY(_to_java_type(data_type.element_type))
# MapType
elif isinstance(data_type, MapType):
return Types.MAP(_to_java_type(data_type.key_type), _to_java_type(data_type.value_type))
# MultisetType
elif isinstance(data_type, MultisetType):
return Types.MULTISET(_to_java_type(data_type.element_type))
# RowType
elif isinstance(data_type, RowType):
return Types.ROW(
to_jarray(gateway.jvm.String, data_type.field_names()),
to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(f.data_type) for f in data_type.fields]))
# UserDefinedType
elif isinstance(data_type, UserDefinedType):
if data_type.java_udt():
return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
gateway.jvm.Class.forName(data_type.java_udt()))
else:
return _to_java_type(data_type.sql_type())
else:
raise TypeError("Not supported type: %s" % data_type)
Computing over window aggregates on a streaming table is only a parallel
operation if the window is partitioned. Otherwise, the whole stream will be processed
by a single task, i.e., with parallelism 1.
.. note::
Over-windows for batch tables are currently not supported.
:param over_windows: over windows created from :class:`~pyflink.table.window.Over`.
:type over_windows: pyflink.table.window.OverWindow
:return: A over windowed table.
:rtype: pyflink.table.OverWindowedTable
"""
gateway = get_gateway()
window_array = to_jarray(gateway.jvm.OverWindow,
[item._java_over_window for item in over_windows])
return OverWindowedTable(self._j_table.window(window_array))
def __init__(self, field_names=None, data_types=None, j_table_schema=None):
if j_table_schema is None:
gateway = get_gateway()
j_field_names = to_jarray(gateway.jvm.String, field_names)
j_data_types = to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(item) for item in data_types])
self._j_table_schema = gateway.jvm.TableSchema(j_field_names, j_data_types)
else:
self._j_table_schema = j_table_schema
def __init__(self, field_names=None, data_types=None, j_table_schema=None):
if j_table_schema is None:
gateway = get_gateway()
j_field_names = to_jarray(gateway.jvm.String, field_names)
j_data_types = to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(item) for item in data_types])
self._j_table_schema = gateway.jvm.TableSchema(j_field_names, j_data_types)
else:
self._j_table_schema = j_table_schema