diff --git a/docs/content.zh/docs/connectors/datastream/formats/parquet.md b/docs/content.zh/docs/connectors/datastream/formats/parquet.md index 87c1a1b3dd794..16b0249c72bb8 100644 --- a/docs/content.zh/docs/connectors/datastream/formats/parquet.md +++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md @@ -26,21 +26,22 @@ under the License. --> + + # Parquet format -Flink supports reading [Parquet](https://parquet.apache.org/) files, -producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records. -To use the format you need to add the `flink-parquet` dependency to your project: +Flink 支持读取 [Parquet](https://parquet.apache.org/) 文件并生成 {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} 和 [Avro](https://avro.apache.org/) 记录。 +要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中: ```xml - org.apache.flink - flink-parquet - {{< version >}} + org.apache.flink + flink-parquet + {{< version >}} ``` -To read Avro records, you will need to add the `parquet-avro` dependency: +要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中: ```xml @@ -61,83 +62,78 @@ To read Avro records, you will need to add the `parquet-avro` dependency: ``` -This format is compatible with the new Source that can be used in both batch and streaming execution modes. -Thus, you can use this format for two kinds of data: +此格式与新的 Source 兼容,可以同时在批和流模式下使用。 +因此,你可使用此格式处理以下两类数据: -- Bounded data: lists all files and reads them all. -- Unbounded data: monitors a directory for new files that appear. +- 有界数据: 列出所有文件并全部读取。 +- 无界数据:监控目录中出现的新文件 {{< hint info >}} -When you start a File Source it is configured for bounded data by default. -To configure the File Source for unbounded data, you must additionally call -`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`. +当你开启一个 File Source,会被默认为有界读取。 +如果你想在连续读取模式下使用 File Source,你必须额外调用 +`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`。 {{< /hint >}} **Vectorized reader** ```java - // Parquet rows are decoded in batches FileSource.forBulkFileFormat(BulkFormat,Path...) - // Monitor the Paths to read data as unbounded data FileSource.forBulkFileFormat(BulkFormat,Path...) - .monitorContinuously(Duration.ofMillis(5L)) - .build(); - +.monitorContinuously(Duration.ofMillis(5L)) +.build(); ``` **Avro Parquet reader** ```java - // Parquet rows are decoded in batches FileSource.forRecordStreamFormat(StreamFormat,Path...) - // Monitor the Paths to read data as unbounded data FileSource.forRecordStreamFormat(StreamFormat,Path...) .monitorContinuously(Duration.ofMillis(5L)) .build(); - - ``` {{< hint info >}} -Following examples are all configured for bounded data. -To configure the File Source for unbounded data, you must additionally call -`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`. +下面的案例都是基于有界数据的。 +如果你想在连续读取模式下使用 File Source,你必须额外调用 +`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`。 {{< /hint >}} + + ## Flink RowData -In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99"). -Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC. -The second boolean instructs the application that the projected Parquet fields names are case-sensitive. -There is no watermark strategy defined as records do not contain event timestamps. +在此示例中,你将创建由 Parquet 格式的记录构成的 Flink RowDatas DataStream。我们把 schema 信息映射为只读字段("f7"、"f4" 和 "f99")。 +每个批次读取 500 条记录。其中,第一个布尔类型的参数用来指定是否需要将时间戳列处理为 UTC。 +第二个布尔类型参数用来指定在进行 Parquet 字段映射时,是否要区分大小写。 +这里不需要水印策略,因为记录中不包含事件时间戳。 ```java final LogicalType[] fieldTypes = - new LogicalType[] { - new DoubleType(), new IntType(), new VarCharType() - }; + new LogicalType[] { + new DoubleType(), new IntType(), new VarCharType() + }; final ParquetColumnarRowInputFormat format = - new ParquetColumnarRowInputFormat<>( - new Configuration(), - RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}), - 500, - false, - true); + new ParquetColumnarRowInputFormat<>( + new Configuration(), + RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}), + 500, + false, + true); final FileSource source = - FileSource.forBulkFileFormat(format, /* Flink Path */) - .build(); + FileSource.forBulkFileFormat(format, /* Flink Path */) + .build(); final DataStream stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` ## Avro Records -Flink supports producing three types of Avro records by reading Parquet files: +Flink 支持三种方式来读取 Parquet 文件并创建 Avro records : - [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html) - [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html) @@ -145,62 +141,62 @@ Flink supports producing three types of Avro records by reading Parquet files: ### Generic record -Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html). -This example uses an Avro schema example similar to the one described in the [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html): +使用 JSON 定义 Avro schemas。你可以从 [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html) 获取更多关于 Avro schemas 和类型的信息。 +此示例使用了一个在 [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) 中描述的示例相似的 Avro schema: ```json lines {"namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favoriteNumber", "type": ["int", "null"]}, - {"name": "favoriteColor", "type": ["string", "null"]} - ] + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favoriteNumber", "type": ["int", "null"]}, + {"name": "favoriteColor", "type": ["string", "null"]} + ] } ``` +这个 schema 定义了一个具有三个属性的的 user 记录:name,favoriteNumber 和 favoriteColor。你可以 +在 [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) 找到更多关于如何定义 Avro schema 的详细信息。 -This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema. - -In the following example, you will create a DataStream containing Parquet records as Avro Generic records. -It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details. -After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records. +在此示例中,你将创建包含由 Avro Generic records 格式构成的 Parquet records 的 DataStream。 +Flink 会基于 JSON 字符串解析 Avro schema。也有很多其他的方式解析 schema,例如基于 java.io.File 或 java.io.InputStream。 +请参考 [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) 以获取更多详细信息。 +然后,你可以通过 `AvroParquetReaders` 为 Avro Generic 记录创建 `AvroParquetRecordFormat`。 ```java -// parsing avro schema +// 解析 avro schema final Schema schema = new Schema.Parser() - .parse( - "{\"type\": \"record\", " - + "\"name\": \"User\", " - + "\"fields\": [\n" - + " {\"name\": \"name\", \"type\": \"string\" },\n" - + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" - + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" - + " ]\n" - + " }"); + .parse( + "{\"type\": \"record\", " + + "\"name\": \"User\", " + + "\"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\" },\n" + + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" + + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + + " ]\n" + + " }"); final FileSource source = FileSource.forRecordStreamFormat( - AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) + AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10L); - + final DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` ### Specific record -Based on the previously defined schema, you can generate classes by leveraging Avro code generation. -Once the classes have been generated, there is no need to use the schema directly in your programs. -You can either use `avro-tools.jar` to generate code manually or you could use the Avro Maven plugin to perform -code generation on any .avsc files present in the configured source directory. Please refer to -[Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information. +基于之前定义的 schema,你可以通过利用 Avro 代码生成来生成类。 +一旦生成了类,就不需要在程序中直接使用 schema。 +你可以使用 `avro-tools.jar` 手动生成代码,也可以直接使用 Avro Maven 插件对配置的源目录中的任何 .avsc 文件执行代码生成。 +请参考 [Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) 获取更多信息。 -The following example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc): +此示例使用了样例 schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc): ```json lines [ @@ -218,17 +214,17 @@ The following example uses the example schema [testdata.avsc](https://github.com ] ``` -You will use the Avro Maven plugin to generate the `Address` Java class: +你可以使用 Avro Maven plugin 生成 `Address` Java 类。 ```java @org.apache.avro.specific.AvroGenerated public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - // generated code... + // 生成的代码... } ``` -You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Specific record -and then create a DataStream containing Parquet records as Avro Specific records. +你可以通过 `AvroParquetReaders` 为 Avro Specific 记录创建 `AvroParquetRecordFormat`, +然后创建一个包含由 Avro Specific records 格式构成的 Parquet records 的 DateStream。 ```java final FileSource source = @@ -245,12 +241,11 @@ final DataStream stream = ### Reflect record -Beyond Avro Generic and Specific record that requires a predefined Avro schema, -Flink also supports creating a DataStream from Parquet files based on existing Java POJO classes. -In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes. -Java types are mapped to Avro schemas, please refer to the [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation for more details. +除了需要预定义 Avro Generic 和 Specific 记录, Flink 还支持基于现有 Java POJO 类从 Parquet 文件创建 DateStream。 +在这种场景中,Avro 会使用 Java 反射为这些 POJO 类生成 schema 和协议。 +请参考 [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) 文档获取更多关于 Java 类型到 Avro schemas 映射的详细信息。 -This example uses a simple Java POJO class [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java): +本例使用了一个简单的 Java POJO 类 [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java): ```java public class Datum implements Serializable { @@ -287,8 +282,8 @@ public class Datum implements Serializable { } ``` -You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Reflect record -and then create a DataStream containing Parquet records as Avro Reflect records. +你可以通过 `AvroParquetReaders` 为 Avro Reflect 记录创建一个 `AvroParquetRecordFormat`, +然后创建一个包含由 Avro Reflect records 格式构成的 Parquet records 的 DateStream。 ```java final FileSource source = @@ -303,14 +298,12 @@ final DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` -#### Prerequisite for Parquet files +### 使用 Parquet files 必备条件 -In order to support reading Avro reflect records, the Parquet file must contain specific meta information. -The Avro schema used for creating the Parquet data must contain a `namespace`, -which will be used by the program to identify the concrete Java class for the reflection process. +为了支持读取 Avro Reflect 数据,Parquet 文件必须包含特定的 meta 信息。为了生成 Parquet 数据,Avro schema 信息中必须包含 namespace, +以便让程序在反射执行过程中能确定唯一的 Java Class 对象。 -The following example shows the `User` schema used previously. But this time it contains a namespace -pointing to the location(in this case the package), where the `User` class for the reflection could be found. +下面的案例展示了上文中的 User 对象的 schema 信息。但是当前案例包含了一个指定文件目录的 namespace(当前案例下的包路径),反射过程中可以找到对应的 User 类。 ```java // avro schema with namespace @@ -324,10 +317,9 @@ final String schema = + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + " ]\n" + " }"; - ``` -Parquet files created with this schema will contain meta information like: +由上述 scheme 信息创建的 Parquet 文件包含以下 meta 信息: ```text creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) @@ -349,45 +341,44 @@ favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAI ``` -With the `User` class defined in the package org.apache.flink.formats.parquet.avro: +使用包 `org.apache.flink.formats.parquet.avro` 路径下已定义的 User 类: ```java public class User { - private String name; - private Integer favoriteNumber; - private String favoriteColor; + private String name; + private Integer favoriteNumber; + private String favoriteColor; - public User() {} + public User() {} - public User(String name, Integer favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } + public User(String name, Integer favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } - public String getName() { - return name; - } + public String getName() { + return name; + } - public Integer getFavoriteNumber() { - return favoriteNumber; - } + public Integer getFavoriteNumber() { + return favoriteNumber; + } - public String getFavoriteColor() { - return favoriteColor; - } + public String getFavoriteColor() { + return favoriteColor; } +} ``` -you can write the following program to read Avro Reflect records of User type from parquet files: +你可以通过下面的程序读取类型为 User 的 Avro Reflect records: ```java final FileSource source = FileSource.forRecordStreamFormat( AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */) .build(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10L);