timestampsAndWatermarks,
- String sourceName)
+ String sourceName);
```
`TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 `SourceOutput`)的一部分透明地运行,因此 Source 实现者不必实现任何时间戳提取和水印生成的代码。
diff --git a/docs/content.zh/docs/dev/datastream/testing.md b/docs/content.zh/docs/dev/datastream/testing.md
index c8e709366665e..5fdd52c746c3d 100644
--- a/docs/content.zh/docs/dev/datastream/testing.md
+++ b/docs/content.zh/docs/dev/datastream/testing.md
@@ -151,11 +151,7 @@ class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
* `TwoInputStreamOperatorTestHarness` (f适用于两个 `DataStream` 的 `ConnectedStreams` 算子)
* `KeyedTwoInputStreamOperatorTestHarness` (适用于两个 `KeyedStream` 上的 `ConnectedStreams` 算子)
-要使用测试工具,还需要一组其他的依赖项(测试范围)。
-
-{{< artifact flink-test-utils withTestScope >}}
-{{< artifact flink-runtime withTestScope >}}
-{{< artifact flink-streaming-java withTestScope withTestClassifier >}}
+要使用测试工具,还需要一组其他的依赖项,请查阅[配置]({{< ref "docs/dev/configuration/testing" >}})小节了解更多细节。
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
diff --git a/docs/content.zh/docs/dev/datastream/user_defined_functions.md b/docs/content.zh/docs/dev/datastream/user_defined_functions.md
index c8c0f3b2b18dd..652fb5a6d2966 100644
--- a/docs/content.zh/docs/dev/datastream/user_defined_functions.md
+++ b/docs/content.zh/docs/dev/datastream/user_defined_functions.md
@@ -209,7 +209,7 @@ this.numLines.add(1);
最终整体结果会存储在由执行环境的 `execute()` 方法返回的 ```JobExecutionResult``` 对象中(当前只有等待作业完成后执行才起作用)。
```java
-myJobExecutionResult.getAccumulatorResult("num-lines")
+myJobExecutionResult.getAccumulatorResult("num-lines");
```
单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。
diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
index 4605c8b750b58..5674e6e0fb6a0 100644
--- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -78,7 +78,7 @@ def state_access_demo():
# 3. define the execution logic
ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
- .map(MyMapFunction(), output_type=Types.ROW([Types.LONG(), Types.LONG()]))
+ .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
# 4. create sink and emit result to sink
output_path = '/opt/output/'
diff --git a/docs/content.zh/docs/dev/python/debugging.md b/docs/content.zh/docs/dev/python/debugging.md
index 2d54424acee47..0cefc596b59a8 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -116,7 +116,7 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl
你可以打开profile来分析性能瓶颈
```python
-t_env.get_config().get_configuration().set_boolean("python.profile.enabled", True)
+t_env.get_config().set("python.profile.enabled", "true")
```
你可以在[日志](#查看日志)里面查看profile的结果
diff --git a/docs/content.zh/docs/dev/python/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md
index c8186865136b5..52b584548a8ab 100644
--- a/docs/content.zh/docs/dev/python/dependency_management.md
+++ b/docs/content.zh/docs/dev/python/dependency_management.md
@@ -51,12 +51,15 @@ If third-party JARs are used, you can specify the JARs in the Python Table API a
# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"
# and will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
-table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+
+# It looks like the following on Windows:
+table_env.get_config().set("pipeline.jars", "file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")
# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";"
# and will be added to the classpath during job execution.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
-table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
```
or in the Python DataStream API as following:
@@ -66,6 +69,9 @@ or in the Python DataStream API as following:
# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
+# It looks like the following on Windows:
+stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar", "file:///E:/my/jar/path/connector2.jar")
+
# Use the add_classpaths() to add the dependent jars URLs into the classpath.
# The URLs will also be added to the classpath of both the client and the cluster.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the
diff --git a/docs/content.zh/docs/dev/python/faq.md b/docs/content.zh/docs/dev/python/faq.md
index 54faf6b9a15c2..939f6b4a441b6 100644
--- a/docs/content.zh/docs/dev/python/faq.md
+++ b/docs/content.zh/docs/dev/python/faq.md
@@ -64,10 +64,10 @@ PyFlink作业可能依赖jar文件,比如connector,Java UDF等。
```python
# 注意:仅支持本地文件URL(以"file:"开头)。
-table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
# 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
-table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
```
有关添加Java依赖项的API的详细信息,请参阅[相关文档]({{< ref "docs/dev/python/dependency_management" >}}#java-dependency-in-python-program)。
diff --git a/docs/content.zh/docs/dev/python/python_config.md b/docs/content.zh/docs/dev/python/python_config.md
index 664fd8594cb8f..8c72def7a5195 100644
--- a/docs/content.zh/docs/dev/python/python_config.md
+++ b/docs/content.zh/docs/dev/python/python_config.md
@@ -49,9 +49,23 @@ from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
+t_env.get_config().set("python.fn-execution.bundle.size", "1000")
+```
-config = t_env.get_config().get_configuration()
-config.set_integer("python.fn-execution.bundle.size", 1000)
+The config options could also be set when creating EnvironmentSettings:
+```python
+from pyflink.common import Configuration
+from pyflink.table import TableEnvironment, EnvironmentSettings
+
+# create a streaming TableEnvironment
+config = Configuration()
+config.set_string("python.fn-execution.bundle.size", "1000")
+env_settings = EnvironmentSettings \
+ .new_instance() \
+ .in_streaming_mode() \
+ .with_configuration(config) \
+ .build()
+table_env = TableEnvironment.create(env_settings)
```
## Python Options
diff --git a/docs/content.zh/docs/dev/python/python_execution_mode.md b/docs/content.zh/docs/dev/python/python_execution_mode.md
index c34da240a20ba..dae060ec65c0a 100644
--- a/docs/content.zh/docs/dev/python/python_execution_mode.md
+++ b/docs/content.zh/docs/dev/python/python_execution_mode.md
@@ -31,61 +31,48 @@ defines how to execute your customized Python functions.
Prior to release-1.15, there is the only execution mode called `PROCESS` execution mode. The `PROCESS`
mode means that the Python user-defined functions will be executed in separate Python processes.
-In release-1.15, it has introduced another two execution modes called `MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will be executed in Python
-different sub-interpreters rather than different threads of one interpreter, which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the same JVM are still affected by GIL.
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER execution mode?
+## When can/should I use THREAD execution mode?
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` mode is to overcome the
-overhead of serialization/deserialization and network communication caused in `PROCESS` mode.
-So if performance is not your concern, or the computing logic of your customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode can largely overcome the
-effects of the GIL, so you can get better performance usually. However, `SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use `PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to `THREAD` mode.
## Configuring Python execution mode
The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
- `PROCESS`: The Python user-defined functions will be executed in separate Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same process as Java operator.
You could specify the Python execution mode using Python Table API as following:
```python
# Specify `PROCESS` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "process")
-
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
+table_env.get_config().set("python.execution-mode", "process")
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().set("python.execution-mode", "thread")
```
{{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in `PROCESS` execution mode.
{{< /hint >}}
{{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` execution mode only supports Python 3.8+.
+`THREAD` execution mode is only supported in Python 3.7+.
{{< /hint >}}
## Execution Behavior
-This section provides an overview of the execution behavior of `MULTI-THREAD` and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP that introduced this feature:
[FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
#### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python worker process using vari
{{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution Mode" >}}
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed in the same process
+as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
{{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution Mode" >}}
diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
index fa3d4a734ab09..6e36a879b59de 100644
--- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
+++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
@@ -29,9 +29,7 @@ under the License.
本文档是对 PyFlink Table API 的简要介绍,用于帮助新手用户快速理解 PyFlink Table API 的基本用法。
关于高级用法,请参阅用户指南中的其他文档。
-
-
-Python Table API 程序的基本结构
+Python Table API 程序的基本结构
--------------------------------------------
所有的 Table API 和 SQL 程序,不管批模式,还是流模式,都遵循相同的结构。下面代码示例展示了 Table API 和 SQL 程序的基本结构。
diff --git a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
index 8f786b5fd15ad..dbf94fbd1eee9 100644
--- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
@@ -36,7 +36,7 @@ under the License.
要在 PyFlink 作业中使用,首先需要将其指定为作业的 [依赖]({{< ref "docs/dev/python/dependency_management" >}})。
```python
-table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
+table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
```
## 如何使用连接器
@@ -86,7 +86,7 @@ def log_processing():
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# specify connector and format jars
- t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
+ t_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
source_ddl = """
CREATE TABLE source_table(
diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md
index bf65cdf050eae..641540b20c1bc 100644
--- a/docs/content.zh/docs/dev/python/table/table_environment.md
+++ b/docs/content.zh/docs/dev/python/table/table_environment.md
@@ -35,12 +35,18 @@ under the License.
创建 `TableEnvironment` 的推荐方式是通过 `EnvironmentSettings` 对象创建:
```python
+from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
-env_settings = EnvironmentSettings.in_streaming_mode()
-# or a batch TableEnvironment
-# env_settings = EnvironmentSettings.in_batch_mode()
+config = Configuration()
+config.set_string('execution.buffer-timeout', '1 min')
+env_settings = EnvironmentSettings \
+ .new_instance() \
+ .in_streaming_mode() \
+ .with_configuration(config) \
+ .build()
+
table_env = TableEnvironment.create(env_settings)
```
@@ -562,12 +568,9 @@ TableEnvironment API
返回 table config,可以通过 table config 来定义 Table API 的运行时行为。
你可以在 }}">配置 和
}}">Python 配置 中找到所有可用的配置选项。
- 下面的代码示例展示了如何通过这个 API 来设置配置选项:
-```python
-# set the parallelism to 8
-table_env.get_config().get_configuration().set_string(
- "parallelism.default", "8")
-```
+ 下面的代码示例展示了如何通过这个 API 来设置配置选项:
+# set the parallelism to 8
+table_env.get_config().set("parallelism.default", "8")
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_config" name="链接">}}
@@ -816,19 +819,19 @@ Statebackend,Checkpoint 以及重启策略
下面代码示例展示了如何通过 Table API 来配置 statebackend,checkpoint 以及重启策略:
```python
# 设置重启策略为 "fixed-delay"
-table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
-table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
-table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")
+table_env.get_config().set("restart-strategy", "fixed-delay")
+table_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
+table_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
# 设置 checkpoint 模式为 EXACTLY_ONCE
-table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
-table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")
+table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().set("execution.checkpointing.interval", "3min")
# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager"
# 你也可以将这个属性设置为 StateBackendFactory 的完整类名
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
-table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")
+table_env.get_config().set("state.backend", "rocksdb")
# 设置 RocksDB statebackend 所需要的 checkpoint 目录
-table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")
+table_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
```
diff --git a/docs/content.zh/docs/dev/python/table_api_tutorial.md b/docs/content.zh/docs/dev/python/table_api_tutorial.md
index 7b1f5db8cfbf7..dfe9c78e341a6 100644
--- a/docs/content.zh/docs/dev/python/table_api_tutorial.md
+++ b/docs/content.zh/docs/dev/python/table_api_tutorial.md
@@ -66,7 +66,7 @@ $ python -m pip install apache-flink
```python
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
-t_env.get_config().get_configuration().set_string("parallelism.default", "1")
+t_env.get_config().set("parallelism.default", "1")
```
接下来,我们将介绍如何创建源表和结果表。
@@ -200,7 +200,7 @@ word_count_data = ["To be, or not to be,--that is the question:--",
def word_count(input_path, output_path):
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# write all the data to one file
- t_env.get_config().get_configuration().set_string("parallelism.default", "1")
+ t_env.get_config().set("parallelism.default", "1")
# define the source
if input_path is not None:
diff --git a/docs/content.zh/docs/dev/table/catalogs.md b/docs/content.zh/docs/dev/table/catalogs.md
index f12f80553e72c..0b86fbd6769a2 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -70,7 +70,7 @@ Catalog 是可扩展的,用户可以通过实现 `Catalog` 接口来开发自
{{< tabs "88ed733a-cf54-4676-9685-7d77d3cc9771" >}}
{{< tab "Java" >}}
```java
-TableEnvironment tableEnv = ...
+TableEnvironment tableEnv = ...;
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "");
diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md
index a1d5539a07195..02f06056a7fb8 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -53,7 +53,7 @@ tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datag
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
- .build())
+ .build());
// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");
@@ -65,7 +65,7 @@ Table table2 = tableEnv.from("SourceTable");
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// Emit a Table API result Table to a TableSink, same for SQL result
-TableResult tableResult = table2.executeInsert("SinkTable");
+TableResult tableResult = table2.insertInto("SinkTable").execute();
```
{{< /tab >}}
{{< tab "Scala" >}}
@@ -95,7 +95,7 @@ val table1 = tableEnv.from("SourceTable")
val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
// Emit a Table API result Table to a TableSink, same for SQL result
-val tableResult = table1.executeInsert("SinkTable")
+val tableResult = table1.insertInto("SinkTable").execute()
```
{{< /tab >}}
{{< tab "Python" >}}
@@ -335,7 +335,7 @@ tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// Using SQL DDL
-tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
```
@@ -645,7 +645,9 @@ Table API 和 SQL 查询的混用非常简单因为它们都返回 `Table` 对
请参考文档 [Table Sources & Sinks]({{< ref "docs/dev/table/sourcesSinks" >}}) 以获取更多关于可用 Sink 的信息以及如何自定义 `DynamicTableSink`。
-方法 `Table.executeInsert(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
+方法 `Table.insertInto(String tableName)` 定义了一个完整的端到端管道将源表中的数据传输到一个被注册的输出表中。
+该方法通过名称在 catalog 中查找输出表并确认 `Table` schema 和输出表 schema 一致。
+可以通过方法 `TablePipeline.explain()` 和 `TablePipeline.execute()` 分别来解释和执行一个数据流管道。
下面的示例演示如何输出 `Table`:
@@ -671,10 +673,16 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
.build());
// compute a result Table using Table API operators and/or SQL queries
-Table result = ...
+Table result = ...;
+
+// Prepare the insert into pipeline
+TablePipeline pipeline = result.insertInto("CsvSinkTable");
+
+// Print explain details
+pipeline.printExplain();
// emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable");
+pipeline.execute();
```
{{< /tab >}}
@@ -701,8 +709,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
+// Prepare the insert into pipeline
+val pipeline = result.insertInto("CsvSinkTable")
+
+// Print explain details
+pipeline.printExplain()
+
// emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable")
+pipeline.execute()
```
{{< /tab >}}
@@ -752,9 +766,9 @@ result.execute_insert("CsvSinkTable")
Table API 或者 SQL 查询在下列情况下会被翻译:
* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
-* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `TablePipeline.execute()` 被调用时。该方法是用来执行一个源表到输出表的数据流,一旦该方法被调用, TABLE API 程序立即被翻译。
* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
-* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 当 `StatementSet.execute()` 被调用时。`TablePipeline` (通过 `StatementSet.add()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
* 当 `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 集成](#integration-with-datastream))。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。
{{< top >}}
@@ -910,10 +924,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.addInsert("MySink1", table1);
+stmtSet.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.addInsert("MySink2", table2);
+stmtSet.add(table2.insertInto("MySink2"));
String explanation = stmtSet.explain();
System.out.println(explanation);
@@ -954,10 +968,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-stmtSet.addInsert("MySink1", table1)
+stmtSet.add(table1.insertInto("MySink1"))
val table2 = table1.unionAll(tEnv.from("MySource2"))
-stmtSet.addInsert("MySink2", table2)
+stmtSet.add(table2.insertInto("MySink2"))
val explanation = stmtSet.explain()
println(explanation)
diff --git a/docs/content.zh/docs/dev/table/concepts/overview.md b/docs/content.zh/docs/dev/table/concepts/overview.md
index 53ef0dc3a7e3d..ed6561c8638a8 100644
--- a/docs/content.zh/docs/dev/table/concepts/overview.md
+++ b/docs/content.zh/docs/dev/table/concepts/overview.md
@@ -34,106 +34,90 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 和 [SQL]({{< ref "
下面这些页面包含了概念、实际的限制,以及流式数据处理中的一些特定的配置。
-State Management
+
+
+状态管理
----------------
+流模式下运行的表程序利用了 Flink 作为有状态流处理器的所有能力。
-Table programs that run in streaming mode leverage all capabilities of Flink as a stateful stream
-processor.
+事实上,一个表程序(Table program)可以配置一个 [state backend]({{< ref "docs/ops/state/state_backends" >}})
+和多个不同的 [checkpoint 选项]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}})
+以处理对不同状态大小和容错需求。这可以对正在运行的 Table API & SQL 管道(pipeline)生成 savepoint,并在这之后用其恢复应用程序的状态。
-In particular, a table program can be configured with a [state backend]({{< ref "docs/ops/state/state_backends" >}})
-and various [checkpointing options]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}})
-for handling different requirements regarding state size and fault tolerance. It is possible to take
-a savepoint of a running Table API & SQL pipeline and to restore the application's state at a later
-point in time.
+
-### State Usage
+### 状态使用
-Due to the declarative nature of Table API & SQL programs, it is not always obvious where and how much
-state is used within a pipeline. The planner decides whether state is necessary to compute a correct
-result. A pipeline is optimized to claim as little state as possible given the current set of optimizer
-rules.
+由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
+管道会被现有优化规则集优化成尽可能少地使用状态。
{{< hint info >}}
-Conceptually, source tables are never kept entirely in state. An implementer deals with logical tables
-(i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). Their state requirements
-depend on the used operations.
+从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}))。
+它们的状态取决于用到的操作。
{{< /hint >}}
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state abstractions are used.
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
+聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
{{< hint info >}}
-Please refer to the individual operator documentation for more details about how much state is required
-and how to limit a potentially ever-growing state size.
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
{{< /hint >}}
-For example, a regular SQL join of two tables requires the operator to keep both input tables in state
-entirely. For correct SQL semantics, the runtime needs to assume that a matching could occur at any
-point in time from both sides. Flink provides [optimized window and interval joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
-that aim to keep the state size small by exploiting the concept of [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
+例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
+Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}})
+以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 概念来让保持较小的状态规模。
-Another example is the following query that computes the number of clicks per session.
+另一个计算每个会话的点击次数的查询语句的例子如下
```sql
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
```
-The `sessionId` attribute is used as a grouping key and the continuous query maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of time. However, the
-continuous query cannot know about this property of `sessionId` and expects that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed `sessionId` value.
-Consequently, the total state size of the query is continuously growing as more and more `sessionId`
-values are observed.
+`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 `sessionId` 属性随着时间逐步演变,
+且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
+并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 的发现不断地增长。
+
+
+
+#### 空闲状态维持时间
-#### Idle State Retention Time
+*空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl)
+定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
-The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl)
-defines for how long the state of a key is retained without being updated before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as soon as it has not
-been updated for the configured period of time.
+通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
+对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
-By removing the state of a key, the continuous query completely forgets that it has seen this key
-before. If a record with a key, whose state has been removed before, is processed, the record will
-be treated as if it was the first record with the respective key. For the example above this means
-that the count of a `sessionId` would start again at `0`.
+
-### Stateful Upgrades and Evolution
+### 状态化更新与演化
-Table programs that are executed in streaming mode are intended as *standing queries* which means they
-are defined once and are continuously evaluated as static end-to-end pipelines.
+表程序在流模式下执行将被视为*标准查询*,这意味着它们被定义一次后将被一直视为静态的端到端 (end-to-end) 管道
-In case of stateful pipelines, any change to both the query or Flink's planner might lead to a completely
-different execution plan. This makes stateful upgrades and the evolution of table programs challenging
-at the moment. The community is working on improving those shortcomings.
+对于这种状态化的管道,对查询和Flink的Planner的改动都有可能导致完全不同的执行计划。这让表程序的状态化的升级和演化在目前而言
+仍具有挑战,社区正致力于改进这一缺点。
-For example, by adding a filter predicate, the optimizer might decide to reorder joins or change the
-schema of an intermediate operator. This prevents restoring from a savepoint due to either changed
-topology or different column layout within the state of an operator.
+例如为了添加过滤谓词,优化器可能决定重排 join 或改变内部算子的 schema。 这会阻碍从 savepoint 的恢复,因为其被改变的拓扑和
+算子状态的列布局差异。
-The query implementer must ensure that the optimized plans before and after the change are compatible.
-Use the `EXPLAIN` command in SQL or `table.explain()` in Table API to [get insights]({{< ref "docs/dev/table/common" >}}#explaining-a-table).
+查询实现者需要确保改变在优化计划前后是兼容的,在 SQL 中使用 `EXPLAIN` 或在 Table API 中使用 `table.explain()`
+可[获取详情]({{< ref "docs/dev/table/common" >}}#explaining-a-table)。
-Since new optimizer rules are continuously added, and operators become more efficient and specialized,
-also the upgrade to a newer Flink version could lead to incompatible plans.
+由于新的优化器规则正不断地被添加,算子变得更加高效和专用,升级到更新的Flink版本可能造成不兼容的计划。
{{< hint warning >}}
-Currently, the framework cannot guarantee that state can be mapped from a savepoint to a new table
-operator topology.
+当前框架无法保证状态可以从 savepoint 映射到新的算子拓扑上。
-In other words: Savepoints are only supported if both the query and the Flink version remain constant.
+换言之: Savepoint 只在查询语句和版本保持恒定的情况下被支持。
{{< /hint >}}
-Since the community rejects contributions that modify the optimized plan and the operator topology
-in a patch version (e.g. from `1.13.1` to `1.13.2`), it should be safe to upgrade a Table API & SQL
-pipeline to a newer bug fix release. However, major-minor upgrades from (e.g. from `1.12` to `1.13`)
-are not supported.
+由于社区拒绝在版本补丁(如 `1.13.1` 至 `1.13.2`)上对优化计划和算子拓扑进行修改的贡献,对一个 Table API & SQL 管道
+升级到新的 bug fix 发行版应当是安全的。然而主次(major-minor)版本的更新(如 `1.12` 至 `1.13`)不被支持。
+
+由于这两个缺点(即修改查询语句和修改Flink版本),我们推荐实现调查升级后的表程序是否可以在切换到实时数据前,被历史数据"暖机"
+(即被初始化)。Flink社区正致力于 [混合源]({{< ref "docs/connectors/datastream/hybridsource" >}}) 来让切换变得尽可能方便。
+
-For both shortcomings (i.e. modified query and modified Flink version), we recommend to investigate
-whether the state of an updated table program can be "warmed up" (i.e. initialized) with historical
-data again before switching to real-time data. The Flink community is working on a [hybrid source]({{< ref "docs/connectors/datastream/hybridsource" >}})
-to make this switching as convenient as possible.
+
接下来?
-----------------
diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md
index 70939a9f8d74f..07f69c7a76a54 100644
--- a/docs/content.zh/docs/dev/table/config.md
+++ b/docs/content.zh/docs/dev/table/config.md
@@ -46,7 +46,7 @@ Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可
{{< tab "Java" >}}
```java
// instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
// access flink configuration
TableConfig configuration = tEnv.getConfig();
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index ff12448050eec..9e3078505fc86 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -467,6 +467,8 @@ import org.apache.flink.table.api.bridge.scala._
{{< /tab >}}
{{< /tabs >}}
+请查阅[配置]({{< ref "docs/dev/configuration/overview" >}})小节了解更多细节。
+
### Configuration
The `TableEnvironment` will adopt all configuration options from the passed `StreamExecutionEnvironment`.
@@ -596,25 +598,25 @@ pipeline or a statement set:
```java
// execute with explicit sink
-tableEnv.from("InputTable").executeInsert("OutputTable")
+tableEnv.from("InputTable").insertInto("OutputTable").execute();
-tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
+tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
tableEnv.createStatementSet()
- .addInsert("OutputTable", tableEnv.from("InputTable"))
- .addInsert("OutputTable2", tableEnv.from("InputTable"))
- .execute()
+ .add(tableEnv.from("InputTable").insertInto("OutputTable"))
+ .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
+ .execute();
tableEnv.createStatementSet()
.addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
.addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
- .execute()
+ .execute();
// execute with implicit local sink
-tableEnv.from("InputTable").execute().print()
+tableEnv.from("InputTable").execute().print();
-tableEnv.executeSql("SELECT * FROM InputTable").print()
+tableEnv.executeSql("SELECT * FROM InputTable").print();
```
To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
@@ -627,17 +629,17 @@ these "external parts".
// (1)
// adds a branch with a printing sink to the StreamExecutionEnvironment
-tableEnv.toDataStream(table).print()
+tableEnv.toDataStream(table).print();
// (2)
// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
-table.execute().print()
+table.execute().print();
// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
-env.execute()
+env.execute();
```
{{< top >}}
@@ -2560,12 +2562,12 @@ TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
// add a pure Table API pipeline
Table tableFromSource = tableEnv.from(sourceDescriptor);
-statementSet.addInsert(sinkDescriptor, tableFromSource);
+statementSet.add(tableFromSource.insertInto(sinkDescriptor));
// use table sinks for the DataStream API pipeline
DataStream dataStream = env.fromElements(1, 2, 3);
Table tableFromStream = tableEnv.fromDataStream(dataStream);
-statementSet.addInsert(sinkDescriptor, tableFromStream);
+statementSet.add(tableFromStream.insertInto(sinkDescriptor));
// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared after calling this method)
@@ -2611,12 +2613,12 @@ val sinkDescriptor = TableDescriptor.forConnector("print").build
// add a pure Table API pipeline
val tableFromSource = tableEnv.from(sourceDescriptor)
-statementSet.addInsert(sinkDescriptor, tableFromSource)
+statementSet.add(tableFromSource.insertInto(sinkDescriptor))
// use table sinks for the DataStream API pipeline
val dataStream = env.fromElements(1, 2, 3)
val tableFromStream = tableEnv.fromDataStream(dataStream)
-statementSet.addInsert(sinkDescriptor, tableFromStream)
+statementSet.add(tableFromStream.insertInto(sinkDescriptor))
// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared calling this method)
@@ -2772,7 +2774,7 @@ In particular, these parts might not be well integrated into many recent new fea
{{< tab "Java" >}}
```java
StreamTableEnvironment tableEnv = ...;
-DataStream> stream = ...
+DataStream> stream = ...;
Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
```
@@ -2908,7 +2910,7 @@ Flink 的 DataStream API 支持多样的数据类型。
```java
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
-DataStream> stream = ...
+DataStream> stream = ...;
// convert DataStream into Table with field "myLong" only
Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -2960,7 +2962,7 @@ table = t_env.from_data_stream(stream, col('my_long'), col('my_int'))
```java
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-DataStream> stream = ...
+DataStream> stream = ...;
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, $("f1"));
@@ -3023,7 +3025,7 @@ Flink 将基础数据类型(`Integer`、`Double`、`String`)或者通用数
```java
StreamTableEnvironment tableEnv = ...;
-DataStream stream = ...
+DataStream stream = ...;
// Convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -3081,7 +3083,7 @@ tuple 的 DataStream 都能被转换成表。
```java
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-DataStream> stream = ...
+DataStream> stream = ...;
// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
@@ -3176,7 +3178,7 @@ Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Person is a POJO with fields "name" and "age"
-DataStream stream = ...
+DataStream stream = ...;
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
@@ -3225,7 +3227,7 @@ Row 类型的字段映射支持基于名称和基于位置两种方式。
StreamTableEnvironment tableEnv = ...;
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
-DataStream stream = ...
+DataStream stream = ...;
// Convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));
diff --git a/docs/content.zh/docs/dev/table/functions/systemFunctions.md b/docs/content.zh/docs/dev/table/functions/systemFunctions.md
index 3a660189fd253..69ab489a9625e 100644
--- a/docs/content.zh/docs/dev/table/functions/systemFunctions.md
+++ b/docs/content.zh/docs/dev/table/functions/systemFunctions.md
@@ -206,7 +206,7 @@ Known Limitations:
```java
table
.groupBy("withColumns(1 to 3)")
- .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+ .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))");
```
{{< /tab >}}
{{< tab "Scala" >}}
diff --git a/docs/content.zh/docs/dev/table/overview.md b/docs/content.zh/docs/dev/table/overview.md
index b3038cb0814dd..3365f0d0dafa6 100644
--- a/docs/content.zh/docs/dev/table/overview.md
+++ b/docs/content.zh/docs/dev/table/overview.md
@@ -33,74 +33,11 @@ Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream API。你可
## Table 程序依赖
-取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:
+您需要将 Table API 作为依赖项添加到项目中,以便用 Table API 和 SQL 定义数据管道。
-{{< tabs "94f8aceb-507f-4c8f-977e-df00fe903203" >}}
-{{< tab "Java" >}}
-```xml
-
- org.apache.flink
- flink-table-api-java-bridge{{< scala_version >}}
- {{< version >}}
- provided
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```xml
-
- org.apache.flink
- flink-table-api-scala-bridge{{< scala_version >}}
- {{< version >}}
- provided
-
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-{{< stable >}}
-```bash
-$ python -m pip install apache-flink {{< version >}}
-```
-{{< /stable >}}
-{{< unstable >}}
-```bash
-$ python -m pip install apache-flink
-```
-{{< /unstable >}}
-{{< /tab >}}
-{{< /tabs >}}
+有关如何为 Java 和 Scala 配置这些依赖项的更多细节,请查阅[项目配置]({{< ref "docs/dev/configuration/overview" >}})小节。
-除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:
-
-```xml
-
- org.apache.flink
- flink-table-planner{{< scala_version >}}
- {{< version >}}
- provided
-
-
- org.apache.flink
- flink-streaming-scala{{< scala_version >}}
- {{< version >}}
- provided
-
-```
-
-### 扩展依赖
-
-如果你想实现[自定义格式或连接器]({{< ref "docs/dev/table/sourcesSinks" >}}) 用于(反)序列化行或一组[用户定义的函数]({{< ref "docs/dev/table/functions/udfs" >}}),下面的依赖就足够了,编译出来的 jar 文件可以直接给 SQL Client 使用:
-
-```xml
-
- org.apache.flink
- flink-table-common
- {{< version >}}
- provided
-
-```
-
-{{< top >}}
+如果您使用 Python,请查阅 [Python API]({{< ref "docs/dev/python/overview" >}}) 文档。
接下来?
-----------------
diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md b/docs/content.zh/docs/dev/table/sourcesSinks.md
index 4f1c5080d68ce..d1940bab57929 100644
--- a/docs/content.zh/docs/dev/table/sourcesSinks.md
+++ b/docs/content.zh/docs/dev/table/sourcesSinks.md
@@ -106,6 +106,33 @@ that the planner can handle.
{{< top >}}
+
+Project Configuration
+---------------------
+
+If you want to implement a custom connector or a custom format, the following dependency is usually
+sufficient:
+
+{{< artifact_tabs flink-table-common withProvidedScope >}}
+
+If you want to develop a connector that needs to bridge with DataStream APIs (i.e. if you want to adapt
+a DataStream connector to the Table API), you need to add this dependency:
+
+{{< artifact_tabs flink-table-api-java-bridge withProvidedScope >}}
+
+When developing the connector/format, we suggest shipping both a thin JAR and an uber JAR, so users
+can easily load the uber JAR in the SQL client or in the Flink distribution and start using it.
+The uber JAR should include all the third-party dependencies of the connector,
+excluding the table dependencies listed above.
+
+{{< hint warning >}}
+You should not depend on `flink-table-planner{{< scala_version >}}` in production code.
+With the new module `flink-table-planner-loader` introduced in Flink 1.15, the
+application's classpath will not have direct access to `org.apache.flink.table.planner` classes anymore.
+If you need a feature available only internally within the `org.apache.flink.table.planner` package and subpackages, please open an issue.
+To learn more, check out [Anatomy of Table Dependencies]({{< ref "docs/dev/configuration/advanced" >}}#anatomy-of-table-dependencies).
+{{< /hint >}}
+
Extension Points
----------------
diff --git a/docs/content.zh/docs/dev/table/sql/queries/match_recognize.md b/docs/content.zh/docs/dev/table/sql/queries/match_recognize.md
index e6b11fc99af4c..9f3e15b0edd0d 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/match_recognize.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/match_recognize.md
@@ -84,7 +84,7 @@ Flink 的 `MATCH_RECOGNIZE` 子句实现是一个完整标准子集。仅支持
```
-或者,也可以将依赖项添加到集群的 classpath(查看 [dependency section]({{< ref "docs/dev/datastream/project-configuration" >}}) 获取更多相关依赖信息)。
+或者,也可以将依赖项添加到集群的 classpath(查看 [dependency section]({{< ref "docs/dev/configuration/overview" >}}) 获取更多相关依赖信息)。
如果你想在 [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}) 中使用 `MATCH_RECOGNIZE` 子句,你无需执行任何操作,因为默认情况下包含所有依赖项。
diff --git a/docs/content.zh/docs/dev/table/sqlClient.md b/docs/content.zh/docs/dev/table/sqlClient.md
index 2239121c50c8f..6f6067010b36a 100644
--- a/docs/content.zh/docs/dev/table/sqlClient.md
+++ b/docs/content.zh/docs/dev/table/sqlClient.md
@@ -368,16 +368,17 @@ When execute queries or insert statements, please enter the interactive mode or
### Dependencies
-The SQL Client does not require to setup a Java project using Maven or SBT. Instead, you can pass the
-dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR
-file separately (using `--jar`) or define entire library directories (using `--library`). For
+The SQL Client does not require setting up a Java project using Maven, Gradle, or sbt. Instead, you
+can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify
+each JAR file separately (using `--jar`) or define entire library directories (using `--library`). For
connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON),
Flink provides **ready-to-use JAR bundles**. These JAR files can be downloaded for each release from
the Maven central repository.
-The full list of offered SQL JARs and documentation about how to use them can be found on the [connection to external systems page]({{< ref "docs/connectors/table/overview" >}}).
+The full list of offered SQL JARs can be found on the [connection to external systems page]({{< ref "docs/connectors/table/overview" >}}).
-{{< top >}}
+You can refer to the [configuration]({{< ref "docs/dev/configuration/connector" >}}) section for
+information on how to configure connector and format dependencies.
Use SQL Client to submit job
----------------------------
diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md
index 860464a0f6b84..65a9f9d87a257 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -1455,7 +1455,9 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
{{< label Batch >}} {{< label Streaming >}}
-和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。`executeInsert()` 方法将立即提交执行插入操作的 Flink job。
+和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。
+`insertInto()` 方法会将 `INSERT INTO` 转换为一个 `TablePipeline`。
+该数据流可以用 `TablePipeline.explain()` 来解释,用 `TablePipeline.execute()` 来执行。
输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。
@@ -1463,13 +1465,13 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
{{< tab "Java" >}}
```java
Table orders = tableEnv.from("Orders");
-orders.executeInsert("OutOrders");
+orders.insertInto("OutOrders").execute();
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val orders = tableEnv.from("Orders")
-orders.executeInsert("OutOrders")
+orders.insertInto("OutOrders").execute()
```
{{< /tab >}}
{{< tab "Python" >}}
diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md
index 5d3111a5a77ea..9aa82529a1cce 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -59,7 +59,7 @@ Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployme
{{< tab "Java" >}}
```java
// instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
// access flink configuration
TableConfig configuration = tEnv.getConfig();
@@ -88,11 +88,11 @@ configuration.set("table.exec.mini-batch.size", "5000") // the maximum number of
t_env = ...
# access flink configuration
-configuration = t_env.get_config().get_configuration();
+configuration = t_env.get_config()
# set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true"); # enable mini-batch optimization
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s"); # use 5 seconds to buffer input records
-configuration.set_string("table.exec.mini-batch.size", "5000"); # the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true") # enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s") # use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000") # the maximum number of records can be buffered by each aggregate operator task
```
{{< /tab >}}
{{< /tabs >}}
@@ -121,7 +121,7 @@ GROUP BY color
{{< tab "Java" >}}
```java
// instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
@@ -152,12 +152,12 @@ configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable t
t_env = ...
# access flink configuration
-configuration = t_env.get_config().get_configuration();
+configuration = t_env.get_config()
# set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true"); # local-global aggregation depends on mini-batch is enabled
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s");
-configuration.set_string("table.exec.mini-batch.size", "5000");
-configuration.set_string("table.optimizer.agg-phase-strategy", "TWO_PHASE"); # enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true") # local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") # enable two-phase, i.e. local-global aggregation
```
{{< /tab >}}
{{< /tabs >}}
@@ -206,7 +206,7 @@ GROUP BY day
{{< tab "Java" >}}
```java
// instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
tEnv.getConfig()
.set("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
@@ -226,8 +226,7 @@ tEnv.getConfig
# instantiate table environment
t_env = ...
-t_env.get_config()
- .set("table.optimizer.distinct-agg.split.enabled", "true"); # enable distinct agg split
+t_env.get_config().set("table.optimizer.distinct-agg.split.enabled", "true") # enable distinct agg split
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content.zh/docs/flinkDev/ide_setup.md b/docs/content.zh/docs/flinkDev/ide_setup.md
index f17b32d36519a..a4c985917b328 100644
--- a/docs/content.zh/docs/flinkDev/ide_setup.md
+++ b/docs/content.zh/docs/flinkDev/ide_setup.md
@@ -28,7 +28,7 @@ under the License.
# 导入 Flink 到 IDE 中
-以下章节描述了如何将 Flink 项目导入到 IDE 中以进行 Flink 本身的源码开发。有关 Flink 程序编写的信息,请参阅 [Java API]({{< ref "docs/dev/datastream/project-configuration" >}}) 和 [Scala API]({{< ref "docs/dev/datastream/project-configuration" >}}) 快速入门指南。
+以下章节描述了如何将 Flink 项目导入到 IDE 中以进行 Flink 本身的源码开发。有关 Flink 程序编写的信息,请参阅 [Java API]({{< ref "docs/dev/configuration/overview" >}}) 和 [Scala API]({{< ref "docs/dev/configuration/overview" >}}) 快速入门指南。
{{< hint info >}}
每当你的 IDE 无法正常工作时,请优先尝试使用 Maven 命令行(`mvn clean package -DskipTests`),因为它可能是由于你的 IDE 中存在错误或未正确设置。
diff --git a/docs/content.zh/docs/learn-flink/datastream_api.md b/docs/content.zh/docs/learn-flink/datastream_api.md
index fdad73b9588fe..a032c96d65b7f 100644
--- a/docs/content.zh/docs/learn-flink/datastream_api.md
+++ b/docs/content.zh/docs/learn-flink/datastream_api.md
@@ -1,6 +1,6 @@
---
title: DataStream API 简介
-weight: 2
+weight: 3
type: docs
---
+
+# Checkpoints vs. Savepoints
+
+## Overview
+
+Conceptually, Flink's [savepoints]({{< ref "docs/ops/state/savepoints" >}}) are different from [checkpoints]({{< ref "docs/ops/state/checkpoints" >}})
+in a way that's analogous to how backups are different from recovery logs in traditional database systems.
+
+The primary purpose of checkpoints is to provide a recovery mechanism in case of unexpected job failures.
+A [checkpoint's lifecycle]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}}) is managed by Flink,
+i.e. a checkpoint is created, owned, and released by Flink - without user interaction.
+Because checkpoints are being triggered often, and are relied upon for failure recovery, the two main design goals for the checkpoint implementation are
+i) being as lightweight to create and ii) being as fast to restore from as possible.
+Optimizations towards those goals can exploit certain properties, e.g., that the job code doesn't change between the execution attempts.
+
+{{< hint info >}}
+- Checkpoints are automatically deleted if the application is terminated by the user
+(except if checkpoints are explicitly configured to be retained).
+- Checkpoints are stored in state backend-specific (native) data format (may be incremental depending on the specific backend).
+{{< /hint >}}
+
+Although [savepoints]({{< ref "docs/ops/state/savepoints" >}}) are created internally with the same mechanisms as
+checkpoints, they are conceptually different and can be a bit more expensive to produce and restore from. Their design focuses
+more on portability and operational flexibility, especially with respect to changes to the job.
+The use case for savepoints is for planned, manual operations. For example, this could be an update of your Flink version, changing your job graph, and so on.
+
+{{< hint info >}}
+- Savepoints are created, owned and deleted solely by the user.
+That means, Flink does not delete savepoints neither after job termination nor after
+restore.
+- Savepoints are stored in a state backend independent (canonical) format (Note: Since Flink 1.15, savepoints can be also stored in
+the backend-specific [native]({{< ref "docs/ops/state/savepoints" >}}#savepoint-format) format which is faster to create
+and restore but comes with some limitations.
+{{< /hint >}}
+
+### Capabilities and limitations
+The following table gives an overview of capabilities and limitations for the various types of savepoints and
+checkpoints.
+- ✓ - Flink fully support this type of the snapshot
+- x - Flink doesn't support this type of the snapshot
+- ! - While these operations currently work, Flink doesn't officially guarantee support for them, so there is a certain level of risk associated with them
+
+| Operation | Canonical Savepoint | Native Savepoint | Aligned Checkpoint | Unaligned Checkpoint |
+|:----------------------------------|:--------------------|:-----------------|:-------------------|:---------------------|
+| State backend change | ✓ | x | x | x |
+| State Processor API (writing) | ✓ | x | x | x |
+| State Processor API (reading) | ✓ | ! | ! | x |
+| Self-contained and relocatable | ✓ | ✓ | x | x |
+| Schema evolution | ✓ | ! | ! | ! |
+| Arbitrary job upgrade | ✓ | ✓ | ✓ | x |
+| Non-arbitrary job upgrade | ✓ | ✓ | ✓ | x |
+| Flink minor version upgrade | ✓ | ✓ | ✓ | x |
+| Flink bug/patch version upgrade | ✓ | ✓ | ✓ | ✓ |
+| Rescaling | ✓ | ✓ | ✓ | ✓ |
+
+- [State backend change]({{< ref "docs/ops/state/state_backends" >}}) - configuring a different State Backend than was used when taking the snapshot.
+- [State Processor API (writing)]({{< ref "docs/libs/state_processor_api" >}}#writing-new-savepoints) - the ability to create a new snapshot of this type via the State Processor API.
+- [State Processor API (reading)]({{< ref "docs/libs/state_processor_api" >}}#reading-state) - the ability to read states from an existing snapshot of this type via the State Processor API.
+- Self-contained and relocatable - the one snapshot folder contains everything it needs for recovery
+and it doesn't depend on other snapshots which means it can be easily moved to another place if needed.
+- [Schema evolution]({{< ref "docs/dev/datastream/fault-tolerance/serialization/schema_evolution" >}}) - the *state* data type can be changed if it uses a serializer that supports schema evolution (e.g., POJOs and Avro types)
+- Arbitrary job upgrade - the snapshot can be restored even if the [partitioning types]({{< ref "docs/dev/datastream/operators/overview" >}}#physical-partitioning)(rescale, rebalance, map, etc.)
+or in-flight record types for the existing operators have changed.
+- Non-arbitrary job upgrade - restoring the snapshot is possible with updated operators if the job graph topology and in-flight record types remain unchanged.
+- Flink minor version upgrade - restoring a snapshot taken with an older minor version of Flink (1.x → 1.y).
+- Flink bug/patch version upgrade - restoring a snapshot taken with an older patch version of Flink (1.14.x → 1.14.y).
+- Rescaling - restoring the snapshot with a different parallelism than was used during the snapshot creation.
+
+
+{{< top >}}
diff --git a/docs/content.zh/docs/ops/state/savepoints.md b/docs/content.zh/docs/ops/state/savepoints.md
index 51785b67e563d..11f336898d93c 100644
--- a/docs/content.zh/docs/ops/state/savepoints.md
+++ b/docs/content.zh/docs/ops/state/savepoints.md
@@ -27,7 +27,7 @@ under the License.
# Savepoints
-## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?
+## 什么是 Savepoint ?
Savepoint 是依据 Flink [checkpointing 机制]({{< ref "docs/learn-flink/fault_tolerance" >}})所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
@@ -35,11 +35,7 @@ Savepoint 是依据 Flink [checkpointing 机制]({{< ref "docs/learn-flink/fault
**注意:** 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。
{{< /hint >}}
-从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 的主要目的是为意外失败的作业提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 Checkpoint)。
-
- 与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在作业停止后继续存在。 从概念上讲,Savepoint 的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。
-
-除去这些概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。
+To make proper use of savepoints, it's important to understand the differences between [checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) and savepoints which is described in [checkpoints vs. savepoints]({{< ref "docs/ops/state/checkpoints_vs_savepoints" >}}).
## 分配算子 ID
@@ -157,10 +153,55 @@ $ bin/flink run -s :savepointPath [:runArgs]
默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 `--allowNonRestoredState`(short:`-n`)选项跳过无法映射到新程序的状态:
+#### Restore 模式
+
+`Restore 模式` 决定了在 restore 之后谁拥有Savepoint 或者 [externalized checkpoint]({{< ref "docs/ops/state/checkpoints" >}}/#resuming-from-a-retained-checkpoint)的文件的所有权。在这种语境下 Savepoint 和 externalized checkpoint 的行为相似。
+这里我们将它们都称为“快照”,除非另有明确说明。
+
+如前所述,restore 模式决定了谁来接管我们从中恢复的快照文件的所有权。快照可被用户或者 Flink 自身拥有。如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。
+
+每种 restore 模式都有特定的用途。尽管如此,我们仍然认为默认的 *NO_CLAIM* 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。
+
+你可以通过如下方式指定 restore 模式:
```shell
-$ bin/flink run -s :savepointPath -n [:runArgs]
+$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]
```
+**NO_CLAIM (默认的)**
+
+在 *NO_CLAIM* 模式下,Flink 不会接管快照的所有权。它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件。该模式下可以从同一个快照上启动多个作业。
+
+为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的。这仅对`state.backend: rocksdb` 有影响,因为其他 backend 总是创建全量 checkpoint。
+
+一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建。所以,一旦一个 checkpoint 成功制作,就可以删除原快照。在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。
+
+
+ {{< img src="/fig/restore-mode-no_claim.svg" alt="NO_CLAIM restore mode" width="70%" >}}
+
+
+**CLAIM**
+
+另一个可选的模式是 *CLAIM* 模式。该模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它。因此,手动删除快照和从同一个快照上启动两个作业都是不安全的。Flink 会保持[配置数量]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}}/#state-checkpoints-num-retained)的 checkpoint。
+
+
+ {{< img src="/fig/restore-mode-claim.svg" alt="CLAIM restore mode" width="70%" >}}
+
+
+{{< hint info >}}
+**注意:**
+1. Retained checkpoints 被存储在 `//chk-` 这样的目录中。Flink 不会接管 `/` 目录的所有权,而只会接管 `chk-` 的所有权。Flink 不会删除旧作业的目录。
+
+2. [Native](#savepoint-format) 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。
+{{< /hint >}}
+
+**LEGACY**
+
+Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 的所有权没有明确的界定。
+
+
+ {{< img src="/fig/restore-mode-legacy.svg" alt="LEGACY restore mode" width="70%" >}}
+
+
### 删除 Savepoint
```shell
diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index e6d6764a0f3d2..6c8c4593686b0 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -38,7 +38,9 @@ under the License.
在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。
状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 **State Backend**。
-# 可用的 State Backends
+
+
+## 可用的 State Backends
Flink 内置了以下这些开箱即用的 state backends :
@@ -48,6 +50,8 @@ Flink 内置了以下这些开箱即用的 state backends :
如果不设置,默认使用 HashMapStateBackend。
+
+
### HashMapStateBackend
在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。
@@ -59,6 +63,10 @@ HashMapStateBackend 的适用场景:
建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。
+与 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 将数据以对象形式存储在堆中,因此重用这些对象数据是不安全的。
+
+
+
### EmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。
@@ -79,6 +87,7 @@ EmbeddedRocksDBStateBackend 的适用场景:
注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 HashMapStateBackend 相比,EmbeddedRocksDBStateBackend 允许存储非常大的状态。
然而,这也意味着使用 EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低。
所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。
+同时因为存在这些序列化、反序列化操作,重用放入 EmbeddedRocksDBStateBackend 的对象是安全的。
请同时参考 [Task Executor 内存配置]({{< ref "docs/deployment/memory/mem_tuning" >}}#rocksdb-state-backend) 中关于 EmbeddedRocksDBStateBackend 的建议。
@@ -88,7 +97,9 @@ EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Bac
每个 slot 中的 RocksDB instance 的内存大小是有限制的,详情请见 [这里]({{< ref "docs/ops/state/large_state_tuning" >}})。
-# 选择合适的 State Backend
+
+
+## 选择合适的 State Backend
在选择 `HashMapStateBackend` 和 `RocksDB` 的时候,其实就是在性能与可扩展性之间权衡。`HashMapStateBackend` 是非常快的,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上;但是状态的大小受限于集群中可用的内存。
另一方面,`RocksDB` 可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot。
@@ -99,11 +110,15 @@ EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Bac
从 1.13 版本开始,所有的 state backends 都会生成一种普适的格式。因此,如果想切换 state backend 的话,那么最好先升级你的 Flink 版本,在新版本中生成 savepoint,在这之后你才可以使用一个不同的 state backend 来读取并恢复它。
{{< /hint >}}
+
+
## 设置 State Backend
如果没有明确指定,将使用 jobmanager 做为默认的 state backend。你能在 **flink-conf.yaml** 中为所有 Job 设置其他默认的 State Backend。
每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置,如下所示:
+
+
### 设置每个 Job 的 State Backend
`StreamExecutionEnvironment` 可以对每个 Job 的 State Backend 进行设置,如下所示:
@@ -138,6 +153,7 @@ env.setStateBackend(new HashMapStateBackend())
**注意:** 由于 RocksDB 是 Flink 默认分发包的一部分,所以如果你没在代码中使用 RocksDB,则不需要添加此依赖。而且可以在 `flink-conf.yaml` 文件中通过 `state.backend` 配置 State Backend,以及更多的 [checkpointing]({{< ref "docs/deployment/config" >}}#checkpointing) 和 [RocksDB 特定的]({{< ref "docs/deployment/config" >}}#rocksdb-state-backend) 参数。
{{< /hint >}}
+
### 设置默认的(全局的) State Backend
@@ -163,10 +179,14 @@ state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
```
-# RocksDB State Backend 进阶
+
+
+## RocksDB State Backend 进阶
*该小节描述 RocksDB state backend 的更多细节*
+
+
### 增量快照
RocksDB 支持*增量快照*。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
@@ -181,6 +201,8 @@ RocksDB 支持*增量快照*。不同于产生一个包含所有数据的全量
需要注意的是,一旦启用了增量快照,网页上展示的 `Checkpointed Data Size` 只代表增量上传的数据量,而不是一次快照的完整数据量。
+
+
### 内存管理
Flink 致力于控制整个进程的内存消耗,以确保 Flink 任务管理器(TaskManager)有良好的内存使用,从而既不会在容器(Docker/Kubernetes, Yarn等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。
@@ -210,6 +232,8 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 `state.backend.rocksdb.memory.fixed-per-slot` 选项)。
注意在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。
+
+
### 计时器(内存 vs. RocksDB)
计时器(Timer)用于安排稍后的操作(基于事件时间或处理时间),例如触发窗口或回调 `ProcessFunction`。
@@ -220,6 +244,8 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
注意 *在 RocksDB state backend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照。其他状态(如 keyed state)可以被异步快照。*
+
+
### 开启 RocksDB 原生监控指标
您可以选择使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标,并且可以选择性的指定特定指标进行汇报。
@@ -229,7 +255,7 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
**注意:** 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。
{{< /hint >}}
-### 列族(ColumnFamily)级别的预定义选项
+#### 列族(ColumnFamily)级别的预定义选项
注意 在引入 [RocksDB 使用托管内存](#memory-management) 功能后,此机制应限于在*专家调优*或*故障处理*中使用。
@@ -246,7 +272,7 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
RocksDB State Backend 会将 [这里定义]({{< ref "docs/deployment/config" >}}#advanced-rocksdb-state-backends-options) 的所有配置项全部加载。
因此您可以简单的通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。
-### 通过 RocksDBOptionsFactory 配置 RocksDB 选项
+#### 通过 RocksDBOptionsFactory 配置 RocksDB 选项
注意 在引入 [RocksDB 使用托管内存](#memory-management) 功能后,此机制应限于在*专家调优*或*故障处理*中使用。
@@ -303,77 +329,75 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
{{< top >}}
-## Enabling Changelog
+
+
+## 开启 Changelog
+
+{{< hint warning >}} 该功能处于实验状态。 {{< /hint >}}
+
+{{< hint warning >}} 开启 Changelog 可能会给您的应用带来性能损失。(见下文) {{< /hint >}}
+
+
+
+### 介绍
-{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以减少 exactly-once 模式下的端到端延迟。
-{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}}
+一般情况下 checkpoint 的持续时间受如下因素影响:
-### Introduction
+1. Barrier 到达和对齐时间,可以通过 [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) 和 [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) 解决。
-Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode.
+2. 快照制作时间(所谓同步阶段), 可以通过异步快照解决(如[上文]({{<
+ ref "#the-embeddedrocksdbstatebackend">}})所述)。
-Most commonly, checkpoint duration is affected by:
+3. 快照上传时间(异步阶段)。
-1. Barrier travel time and alignment, addressed by
- [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
- and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
-2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{<
- ref "#the-embeddedrocksdbstatebackend">}}))
-4. Snapshot upload time (asynchronous phase)
+可以用[增量 checkpoints]({{< ref "#incremental-checkpoints" >}}) 来减少上传时间。但是,大多数支持增量checkpoint的状态后端会定期执行合并类型的操作,这会导致除了新的变更之外还要重新上传旧状态。在大规模部署中,每次 checkpoint 中至少有一个 task 上传大量数据的可能性往往非常高。
-Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}).
-However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the
-old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of
-data tends to be very high in every checkpoint.
+开启 Changelog 功能之后,Flink 会不断上传状态变更并形成 changelog。创建 checkpoint 时,只有 changelog 中的相关部分需要上传。而配置的状态后端则会定期在后台进行快照,快照成功上传后,相关的changelog 将会被截断。
-With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant
-part of this changelog needs to be uploaded. The configured state backend is snapshotted in the
-background periodically. Upon successful upload, the changelog is truncated.
+基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善。
-As a result, asynchronous phase duration is reduced, as well as synchronous phase - because no data needs to be flushed
-to disk. In particular, long-tail latency is improved.
+但是,资源使用会变得更高:
-However, resource usage is higher:
+- 将会在 DFS 上创建更多文件
+- 将可能在 DFS 上残留更多文件(这将在 FLINK-25511 和 FLINK-25512 之后的新版本中被解决)
+- 将使用更多的 IO 带宽用来上传状态变更
+- 将使用更多 CPU 资源来序列化状态变更
+- Task Managers 将会使用更多内存来缓存状态变更
-- more files are created on DFS
-- more files can be left undeleted DFS (this will be addressed in the future versions in FLINK-25511 and FLINK-25512)
-- more IO bandwidth is used to upload state changes
-- more CPU used to serialize state changes
-- more memory used by Task Managers to buffer state changes
+另一项需要考虑的事情是恢复时间。取决于 `state.backend.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
-Recovery time is another thing to consider. Depending on the `state.backend.changelog.periodic-materialize.interval`
-setting, the changelog can become lengthy and replaying it may take more time. However, recovery time combined with
-checkpoint duration will likely still be lower than in non-changelog setups, providing lower end-to-end latency even in
-failover case. However, it's also possible that the effective recovery time will increase, depending on the actual ratio
-of the aforementioned times.
+有关更多详细信息,请参阅 [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints)。
-For more details, see [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints).
+
-### Installation
+### 安装
-Changelog JARs are included into the standard Flink distribution.
+标准的 Flink 发行版包含 Changelog 所需要的 JAR包。
-Make sure to [add]({{< ref "docs/deployment/filesystems/overview" >}}) the necessary filesystem plugins.
+请确保[添加]({{< ref "docs/deployment/filesystems/overview" >}})所需的文件系统插件。
-### Configuration
+
-Here is an example configuration in YAML:
+### 配置
+
+这是 YAML 中的示例配置:
```yaml
state.backend.changelog.enabled: true
-state.backend.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
-dstl.dfs.base-path: s3:// # similar to state.checkpoints.dir
+state.backend.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
+dstl.dfs.base-path: s3:// # 类似于 state.checkpoints.dir
```
-Please keep the following defaults (see [limitations](#limitations)):
+请将如下配置保持默认值 (参见[限制](#limitations)):
```yaml
execution.checkpointing.max-concurrent-checkpoints: 1
state.backend.local-recovery: false
```
-Please refer to the [configuration section]({{< ref "docs/deployment/config#state-changelog-options" >}}) for other options.
+有关其他配置选项,请参阅[配置]({{< ref "docs/deployment/config#state-changelog-options" >}})部分。
-Changelog can also be enabled or disabled per job programmatically:
+也可以通过编程方式为每个作业开启或关闭 Changelog:
{{< tabs >}}
{{< tab "Java" >}}
```java
@@ -395,39 +419,45 @@ env.enable_changelog_statebackend(true)
{{< /tab >}}
{{< /tabs >}}
-### Monitoring
+
+
+### 监控
+
+[此处]({{< ref "docs/ops/metrics#state-changelog" >}})列出了可用的指标。
-Available metrics are listed [here]({{< ref "docs/ops/metrics#changelog" >}}).
+如果 task 因写状态变更而被反压,他将在 UI 中被显示为忙碌(红色)。
-If a task is backpressured by writing state changes, it will be shown as busy (red) in the UI.
+
-### Upgrading existing jobs
+### 升级现有作业
-**Enabling Changelog**
+**开启 Changelog**
-Resuming only from savepoints in canonical format is supported:
-- given an existing non-changelog job
-- take a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (canonical format is the default)
-- alter configuration (enable Changelog)
-- resume from the taken snapshot
+仅支持从标准格式的 savepoint 恢复:
+- 给定一个没有开启 Changelog 的作业
+- 创建一个 [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (默认为标准格式)
+- 更改配置(开启 Changelog)
+- 从创建的 snapshot 恢复
-**Disabling Changelog**
+**关闭 Changelog**
-Resuming only from [savepoints]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}})
-is supported. Resuming from [checkpoints]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
-is planned in the future versions.
+仅支持从 [savepoints]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) 恢复。从 [checkpoints]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}}) 恢复计划在未来版本中支持。
-**State migration** (including changing TTL) is currently not supported
+当前不支持**状态迁移**(包括改变 TTL)。
-### Limitations
- - At most one concurrent checkpoint
- - Local recovery not supported
- - As of Flink 1.15, only `filesystem` changelog implementation is available
- - State migration (including changing TTL) is currently not supported
-- [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) mode not supported
+
+
+### 限制
+- 最多同时创建一个 checkpoint
+- 本地恢复暂不支持
+- 到 Flink 1.15 为止, 只有 `filesystem` changelog 实现可用
+- 尚不支持状态迁移(包括修改 TTL)
+- 尚不支持 [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) 模式
{{< top >}}
+
+
## 自旧版本迁移
从 **Flink 1.13** 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。
diff --git a/docs/content.zh/docs/ops/upgrading.md b/docs/content.zh/docs/ops/upgrading.md
index 73d4c3cf7604b..b12111e43bd9b 100644
--- a/docs/content.zh/docs/ops/upgrading.md
+++ b/docs/content.zh/docs/ops/upgrading.md
@@ -216,6 +216,8 @@ Savepoints are compatible across Flink versions as indicated by the table below:
1.11.x |
1.12.x |
1.13.x |
+ 1.14.x |
+ 1.15.x |
Limitations |
|