Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _from_j_state_backend(j_state_backend):
if j_state_backend is None:
return None
gateway = get_gateway()
JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory.MemoryStateBackend
JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem.FsStateBackend
JRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBStateBackend
j_clz = j_state_backend.getClass()
if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
raise TypeError("The input %s is not an instance of StateBackend." % j_state_backend)
if get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
return MemoryStateBackend(j_memory_state_backend=j_state_backend)
elif get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
return FsStateBackend(j_fs_state_backend=j_state_backend)
elif get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
else:
return CustomStateBackend(j_state_backend) # users' customized state backend
def _is_instance_of(java_data_type, java_class):
gateway = get_gateway()
if isinstance(java_class, basestring):
param = java_class
elif isinstance(java_class, JavaClass):
param = get_java_class(java_class)
elif isinstance(java_class, JavaObject):
if not _is_instance_of(java_class, gateway.jvm.Class):
param = java_class.getClass()
else:
param = java_class
else:
raise TypeError(
"java_class must be a string, a JavaClass, or a JavaObject")
return gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
param, java_data_type)
def _is_instance_of(java_data_type, java_class):
gateway = get_gateway()
if isinstance(java_class, str):
param = java_class
elif isinstance(java_class, JavaClass):
param = get_java_class(java_class)
elif isinstance(java_class, JavaObject):
if not _is_instance_of(java_class, gateway.jvm.Class):
param = java_class.getClass()
else:
param = java_class
else:
raise TypeError(
"java_class must be a string, a JavaClass, or a JavaObject")
return gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
param, java_data_type)
def _from_j_restart_strategy(j_restart_strategy):
if j_restart_strategy is None:
return None
gateway = get_gateway()
NoRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.NoRestartStrategyConfiguration
FixedDelayRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FixedDelayRestartStrategyConfiguration
FailureRateRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FailureRateRestartStrategyConfiguration
FallbackRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FallbackRestartStrategyConfiguration
clz = j_restart_strategy.getClass()
if clz.getName() == get_java_class(NoRestartStrategyConfiguration).getName():
return RestartStrategies.NoRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FixedDelayRestartStrategyConfiguration).getName():
return RestartStrategies.FixedDelayRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FailureRateRestartStrategyConfiguration).getName():
return RestartStrategies.FailureRateRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FallbackRestartStrategyConfiguration).getName():
return RestartStrategies.FallbackRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
else:
raise Exception("Unsupported java RestartStrategyConfiguration: %s" % clz.getName())
def _from_j_state_backend(j_state_backend):
if j_state_backend is None:
return None
gateway = get_gateway()
JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory.MemoryStateBackend
JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem.FsStateBackend
JRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBStateBackend
j_clz = j_state_backend.getClass()
if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
raise TypeError("The input %s is not an instance of StateBackend." % j_state_backend)
if get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
return MemoryStateBackend(j_memory_state_backend=j_state_backend)
elif get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
return FsStateBackend(j_fs_state_backend=j_state_backend)
elif get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
else:
return CustomStateBackend(j_state_backend) # users' customized state backend
def _from_j_state_backend(j_state_backend):
if j_state_backend is None:
return None
gateway = get_gateway()
JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory.MemoryStateBackend
JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem.FsStateBackend
JRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBStateBackend
j_clz = j_state_backend.getClass()
if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
raise TypeError("The input %s is not an instance of StateBackend." % j_state_backend)
if get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
return MemoryStateBackend(j_memory_state_backend=j_state_backend)
elif get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
return FsStateBackend(j_fs_state_backend=j_state_backend)
elif get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
else:
return CustomStateBackend(j_state_backend) # users' customized state backend
Because the options are not serializable and hold native code references,
they must be specified through a factory.
The options created by the factory here are applied on top of the pre-defined
options profile selected via :func:`set_predefined_options`.
If the pre-defined options profile is the default (:data:`PredefinedOptions.DEFAULT`),
then the factory fully controls the RocksDB options.
:param options_factory_class_name: The fully-qualified class name of the options
factory in Java that lazily creates the RocksDB options.
The options factory must have a default constructor.
"""
gateway = get_gateway()
JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
j_options_factory_clz = load_java_class(options_factory_class_name)
if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
raise ValueError("The input class not implements OptionsFactory.")
self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
if table_config is not None:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment,
table_config._j_table_config)
elif environment_settings is not None:
if not environment_settings.is_streaming_mode():
raise ValueError("The environment settings for StreamTableEnvironment must be "
"set to streaming mode.")
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment,
environment_settings._j_environment_settings)
else:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment)
j_planner_class = j_tenv.getPlanner().getClass()
j_blink_planner_class = get_java_class(
get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
is_blink_planner = j_blink_planner_class.isAssignableFrom(j_planner_class)
return StreamTableEnvironment(j_tenv, is_blink_planner)
return None
gateway = get_gateway()
JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory.MemoryStateBackend
JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem.FsStateBackend
JRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBStateBackend
j_clz = j_state_backend.getClass()
if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
raise TypeError("The input %s is not an instance of StateBackend." % j_state_backend)
if get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
return MemoryStateBackend(j_memory_state_backend=j_state_backend)
elif get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
return FsStateBackend(j_fs_state_backend=j_state_backend)
elif get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
else:
return CustomStateBackend(j_state_backend) # users' customized state backend