Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Creates a table from a table source.
Example:
::
>>> csv_table_source = CsvTableSource(
... csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()])
>>> table_env.from_table_source(csv_table_source)
:param table_source: The table source used as table.
:type table_source: pyflink.table.TableSource
:return: The result table.
:rtype: pyflink.table.Table
"""
return Table(self._j_tenv.fromTableSource(table_source._j_table_source))
gateway = get_gateway()
j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
if self._is_blink_planner:
PythonTableUtils = gateway.jvm \
.org.apache.flink.table.planner.utils.python.PythonTableUtils
PythonInputFormatTableSource = gateway.jvm \
.org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
else:
PythonTableUtils = gateway.jvm.PythonTableUtils
PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
j_input_format = PythonTableUtils.getInputFormat(
j_objs, row_type_info, execution_config)
j_table_source = PythonInputFormatTableSource(
j_input_format, row_type_info)
return Table(self._j_tenv.fromTableSource(j_table_source))
finally:
os.unlink(temp_file.name)
Reading a table from a registered catalog with escaping. (`Table` is a reserved keyword).
Dots in e.g. a database name also must be escaped.
::
>>> tab = table_env.from_path("catalogName.`db.Name`.`Table`")
:param path: The path of a table API object to scan.
:type path: str
:return: Either a table or virtual table (=view).
:rtype: pyflink.table.Table
.. seealso:: :func:`use_catalog`
.. seealso:: :func:`use_database`
"""
return Table(get_method(self._j_tenv, "from")(path))
:func:`~Table.__str__` method is called, for example when it is embedded into a String.
Hence, SQL queries can directly reference a :class:`~pyflink.table.Table` as follows:
::
>>> table = ...
# the table is not registered to the table environment
>>> table_env.sql_query("SELECT * FROM %s" % table)
:param query: The sql query string.
:type query: str
:return: The result table.
:rtype: pyflink.table.Table
"""
j_table = self._j_tenv.sqlQuery(query)
return Table(j_table)
>>> tab = table_env.scan("catalogName", "dbName", "tableName")
:param table_path: The path of the table to scan.
:type table_path: str
:throws: Exception if no table is found using the given table path.
:return: The resulting table.
:rtype: pyflink.table.Table
.. note:: Deprecated in 1.10. Use :func:`from_path` instead.
"""
warnings.warn("Deprecated in 1.10. Use from_path instead.", DeprecationWarning)
gateway = get_gateway()
j_table_paths = utils.to_jarray(gateway.jvm.String, table_path)
j_table = self._j_tenv.scan(j_table_paths)
return Table(j_table)