@@ -26,21 +26,22 @@ under the License.
2626-->
2727
2828
29+ <a name =" parquet-format " ></a >
30+
2931# Parquet format
3032
31- Flink supports reading [ Parquet] ( https://parquet.apache.org/ ) files,
32- producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [ Avro] ( https://avro.apache.org/ ) records.
33- To use the format you need to add the ` flink-parquet ` dependency to your project:
33+ Flink 支持读取 [ Parquet] ( https://parquet.apache.org/ ) 文件并生成 {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} 和 [ Avro] ( https://avro.apache.org/ ) 记录。
34+ 要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:
3435
3536``` xml
3637<dependency >
37- <groupId >org.apache.flink</groupId >
38- <artifactId >flink-parquet</artifactId >
39- <version >{{< version >}}</version >
38+ <groupId >org.apache.flink</groupId >
39+ <artifactId >flink-parquet</artifactId >
40+ <version >{{< version >}}</version >
4041</dependency >
4142```
4243
43- To read Avro records, you will need to add the ` parquet-avro ` dependency:
44+ 要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:
4445
4546``` xml
4647<dependency >
@@ -61,146 +62,141 @@ To read Avro records, you will need to add the `parquet-avro` dependency:
6162</dependency >
6263```
6364
64- This format is compatible with the new Source that can be used in both batch and streaming execution modes.
65- Thus, you can use this format for two kinds of data:
65+ 此格式与新的 Source 兼容,可以同时在批和流模式下使用。
66+ 因此,你可使用此格式处理以下两类数据:
6667
67- - Bounded data: lists all files and reads them all.
68- - Unbounded data: monitors a directory for new files that appear.
68+ - 有界数据: 列出所有文件并全部读取。
69+ - 无界数据:监控目录中出现的新文件
6970
7071{{< hint info >}}
71- When you start a File Source it is configured for bounded data by default.
72- To configure the File Source for unbounded data, you must additionally call
73- ` AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration) ` .
72+ 当你开启一个 File Source,会被默认为有界读取。
73+ 如果你想在连续读取模式下使用 File Source,你必须额外调用
74+ ` AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration) ` 。
7475{{< /hint >}}
7576
7677** Vectorized reader**
7778
7879``` java
79-
8080// Parquet rows are decoded in batches
8181FileSource . forBulkFileFormat(BulkFormat ,Path . .. )
82-
8382// Monitor the Paths to read data as unbounded data
8483FileSource . forBulkFileFormat(BulkFormat ,Path . .. )
85- .monitorContinuously(Duration . ofMillis(5L ))
86- .build();
87-
84+ .monitorContinuously(Duration . ofMillis(5L ))
85+ .build();
8886```
8987
9088** Avro Parquet reader**
9189
9290``` java
93-
9491// Parquet rows are decoded in batches
9592FileSource . forRecordStreamFormat(StreamFormat ,Path . .. )
96-
9793// Monitor the Paths to read data as unbounded data
9894FileSource . forRecordStreamFormat(StreamFormat ,Path . .. )
9995 .monitorContinuously(Duration . ofMillis(5L ))
10096 .build();
101-
102-
10397```
10498
10599{{< hint info >}}
106- Following examples are all configured for bounded data.
107- To configure the File Source for unbounded data, you must additionally call
108- ` AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration) ` .
100+ 下面的案例都是基于有界数据的。
101+ 如果你想在连续读取模式下使用 File Source,你必须额外调用
102+ ` AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration) ` 。
109103{{< /hint >}}
110104
105+ <a name =" flink-rowdata " ></a >
106+
111107## Flink RowData
112108
113- In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ( "f7", "f4" and "f99").
114- Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
115- The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
116- There is no watermark strategy defined as records do not contain event timestamps.
109+ 在此示例中,你将创建由 Parquet 格式的记录构成的 Flink RowDatas DataStream。我们把 schema 信息映射为只读字段( "f7"、 "f4" 和 "f99")。
110+ 每个批次读取 500 条记录。其中,第一个布尔类型的参数用来指定是否需要将时间戳列处理为 UTC。
111+ 第二个布尔类型参数用来指定在进行 Parquet 字段映射时,是否要区分大小写。
112+ 这里不需要水印策略,因为记录中不包含事件时间戳。
117113
118114``` java
119115final LogicalType [] fieldTypes =
120- new LogicalType [] {
121- new DoubleType (), new IntType (), new VarCharType ()
122- };
116+ new LogicalType [] {
117+ new DoubleType (), new IntType (), new VarCharType ()
118+ };
123119
124120final ParquetColumnarRowInputFormat<FileSourceSplit > format =
125- new ParquetColumnarRowInputFormat<> (
126- new Configuration (),
127- RowType . of(fieldTypes, new String [] {" f7" , " f4" , " f99" }),
128- 500 ,
129- false ,
130- true );
121+ new ParquetColumnarRowInputFormat<> (
122+ new Configuration (),
123+ RowType . of(fieldTypes, new String [] {" f7" , " f4" , " f99" }),
124+ 500 ,
125+ false ,
126+ true );
131127final FileSource<RowData > source =
132- FileSource . forBulkFileFormat(format, /* Flink Path */ )
133- .build();
128+ FileSource . forBulkFileFormat(format, /* Flink Path */ )
129+ .build();
134130final DataStream<RowData > stream =
135- env. fromSource(source, WatermarkStrategy . noWatermarks(), " file-source" );
131+ env. fromSource(source, WatermarkStrategy . noWatermarks(), " file-source" );
136132```
137133
138134## Avro Records
139135
140- Flink supports producing three types of Avro records by reading Parquet files:
136+ Flink 支持三种方式来读取 Parquet 文件并创建 Avro records :
141137
142138- [ Generic record] ( https://avro.apache.org/docs/1.10.0/api/java/index.html )
143139- [ Specific record] ( https://avro.apache.org/docs/1.10.0/api/java/index.html )
144140- [ Reflect record] ( https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html )
145141
146142### Generic record
147143
148- Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the [ Avro specification] ( https://avro.apache.org/docs/1.10.0/spec.html ) .
149- This example uses an Avro schema example similar to the one described in the [ official Avro tutorial] ( https://avro.apache.org/docs/1.10.0/gettingstartedjava.html ) :
144+ 使用 JSON 定义 Avro schemas。你可以从 [ Avro specification] ( https://avro.apache.org/docs/1.10.0/spec.html ) 获取更多关于 Avro schemas 和类型的信息。
145+ 此示例使用了一个在 [ official Avro tutorial] ( https://avro.apache.org/docs/1.10.0/gettingstartedjava.html ) 中描述的示例相似的 Avro schema:
150146
151147``` json lines
152148{"namespace" : " example.avro" ,
153- "type" : " record" ,
154- "name" : " User" ,
155- "fields" : [
156- {"name" : " name" , "type" : " string" },
157- {"name" : " favoriteNumber" , "type" : [" int" , " null" ]},
158- {"name" : " favoriteColor" , "type" : [" string" , " null" ]}
159- ]
149+ "type" : " record" ,
150+ "name" : " User" ,
151+ "fields" : [
152+ {"name" : " name" , "type" : " string" },
153+ {"name" : " favoriteNumber" , "type" : [" int" , " null" ]},
154+ {"name" : " favoriteColor" , "type" : [" string" , " null" ]}
155+ ]
160156}
161157```
158+ 这个 schema 定义了一个具有三个属性的的 user 记录:name,favoriteNumber 和 favoriteColor。你可以
159+ 在 [ record specification] ( https://avro.apache.org/docs/1.10.0/spec.html#schema_record ) 找到更多关于如何定义 Avro schema 的详细信息。
162160
163- This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [ record specification] ( https://avro.apache.org/docs/1.10.0/spec.html#schema_record ) for how to define an Avro schema.
164-
165- In the following example, you will create a DataStream containing Parquet records as Avro Generic records.
166- It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [ Avro Schema] ( https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html ) for details.
167- After that, you will create an ` AvroParquetRecordFormat ` via ` AvroParquetReaders ` for Avro Generic records.
161+ 在此示例中,你将创建包含由 Avro Generic records 格式构成的 Parquet records 的 DataStream。
162+ Flink 会基于 JSON 字符串解析 Avro schema。也有很多其他的方式解析 schema,例如基于 java.io.File 或 java.io.InputStream。
163+ 请参考 [ Avro Schema] ( https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html ) 以获取更多详细信息。
164+ 然后,你可以通过 ` AvroParquetReaders ` 为 Avro Generic 记录创建 ` AvroParquetRecordFormat ` 。
168165
169166``` java
170- // parsing avro schema
167+ // 解析 avro schema
171168final Schema schema =
172169 new Schema .Parser ()
173- .parse(
174- " {\" type\" : \" record\" , "
175- + " \" name\" : \" User\" , "
176- + " \" fields\" : [\n "
177- + " {\" name\" : \" name\" , \" type\" : \" string\" },\n "
178- + " {\" name\" : \" favoriteNumber\" , \" type\" : [\" int\" , \" null\" ] },\n "
179- + " {\" name\" : \" favoriteColor\" , \" type\" : [\" string\" , \" null\" ] }\n "
180- + " ]\n "
181- + " }" );
170+ .parse(
171+ " {\" type\" : \" record\" , "
172+ + " \" name\" : \" User\" , "
173+ + " \" fields\" : [\n "
174+ + " {\" name\" : \" name\" , \" type\" : \" string\" },\n "
175+ + " {\" name\" : \" favoriteNumber\" , \" type\" : [\" int\" , \" null\" ] },\n "
176+ + " {\" name\" : \" favoriteColor\" , \" type\" : [\" string\" , \" null\" ] }\n "
177+ + " ]\n "
178+ + " }" );
182179
183180final FileSource<GenericRecord > source =
184181 FileSource . forRecordStreamFormat(
185- AvroParquetReaders . forGenericRecord(schema), /* Flink Path */ )
182+ AvroParquetReaders . forGenericRecord(schema), /* Flink Path */ )
186183 .build();
187184
188185final StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment();
189186 env. enableCheckpointing(10L );
190-
187+
191188final DataStream<GenericRecord > stream =
192189 env. fromSource(source, WatermarkStrategy . noWatermarks(), " file-source" );
193190```
194191
195192### Specific record
196193
197- Based on the previously defined schema, you can generate classes by leveraging Avro code generation.
198- Once the classes have been generated, there is no need to use the schema directly in your programs.
199- You can either use ` avro-tools.jar ` to generate code manually or you could use the Avro Maven plugin to perform
200- code generation on any .avsc files present in the configured source directory. Please refer to
201- [ Avro Getting Started] ( https://avro.apache.org/docs/1.10.0/gettingstartedjava.html ) for more information.
194+ 基于之前定义的 schema,你可以通过利用 Avro 代码生成来生成类。
195+ 一旦生成了类,就不需要在程序中直接使用 schema。
196+ 你可以使用 ` avro-tools.jar ` 手动生成代码,也可以直接使用 Avro Maven 插件对配置的源目录中的任何 .avsc 文件执行代码生成。
197+ 请参考 [ Avro Getting Started] ( https://avro.apache.org/docs/1.10.0/gettingstartedjava.html ) 获取更多信息。
202198
203- The following example uses the example schema [ testdata.avsc] ( https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc ) :
199+ 此示例使用了样例 schema [ testdata.avsc] ( https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc ) :
204200
205201``` json lines
206202[
@@ -218,17 +214,17 @@ The following example uses the example schema [testdata.avsc](https://github.com
218214]
219215```
220216
221- You will use the Avro Maven plugin to generate the ` Address ` Java class:
217+ 你可以使用 Avro Maven plugin 生成 ` Address ` Java 类。
222218
223219``` java
224220@org . apache.avro.specific. AvroGenerated
225221public class Address extends org.apache.avro.specific. SpecificRecordBase implements org.apache.avro.specific. SpecificRecord {
226- // generated code ...
222+ // 生成的代码 ...
227223}
228224```
229225
230- You will create an ` AvroParquetRecordFormat ` via ` AvroParquetReaders ` for Avro Specific record
231- and then create a DataStream containing Parquet records as Avro Specific records.
226+ 你可以通过 ` AvroParquetReaders ` 为 Avro Specific 记录创建 ` AvroParquetRecordFormat ` ,
227+ 然后创建一个包含由 Avro Specific records 格式构成的 Parquet records 的 DateStream。
232228
233229``` java
234230final FileSource<GenericRecord > source =
@@ -245,12 +241,11 @@ final DataStream<GenericRecord> stream =
245241
246242### Reflect record
247243
248- Beyond Avro Generic and Specific record that requires a predefined Avro schema,
249- Flink also supports creating a DataStream from Parquet files based on existing Java POJO classes.
250- In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes.
251- Java types are mapped to Avro schemas, please refer to the [ Avro reflect] ( https://avro.apache.org/docs/1.10.0/api/java/index.html ) documentation for more details.
244+ 除了需要预定义 Avro Generic 和 Specific 记录, Flink 还支持基于现有 Java POJO 类从 Parquet 文件创建 DateStream。
245+ 在这种场景中,Avro 会使用 Java 反射为这些 POJO 类生成 schema 和协议。
246+ 请参考 [ Avro reflect] ( https://avro.apache.org/docs/1.10.0/api/java/index.html ) 文档获取更多关于 Java 类型到 Avro schemas 映射的详细信息。
252247
253- This example uses a simple Java POJO class [ Datum] ( https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java ) :
248+ 本例使用了一个简单的 Java POJO 类 [ Datum] ( https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java ) :
254249
255250``` java
256251public class Datum implements Serializable {
@@ -287,8 +282,8 @@ public class Datum implements Serializable {
287282}
288283```
289284
290- You will create an ` AvroParquetRecordFormat ` via ` AvroParquetReaders ` for Avro Reflect record
291- and then create a DataStream containing Parquet records as Avro Reflect records.
285+ 你可以通过 ` AvroParquetReaders ` 为 Avro Reflect 记录创建一个 ` AvroParquetRecordFormat ` ,
286+ 然后创建一个包含由 Avro Reflect records 格式构成的 Parquet records 的 DateStream。
292287
293288``` java
294289final FileSource<GenericRecord > source =
@@ -303,14 +298,12 @@ final DataStream<GenericRecord> stream =
303298 env. fromSource(source, WatermarkStrategy . noWatermarks(), " file-source" );
304299```
305300
306- #### Prerequisite for Parquet files
301+ ### 使用 Parquet files 必备条件
307302
308- In order to support reading Avro reflect records, the Parquet file must contain specific meta information.
309- The Avro schema used for creating the Parquet data must contain a ` namespace ` ,
310- which will be used by the program to identify the concrete Java class for the reflection process.
303+ 为了支持读取 Avro Reflect 数据,Parquet 文件必须包含特定的 meta 信息。为了生成 Parquet 数据,Avro schema 信息中必须包含 namespace,
304+ 以便让程序在反射执行过程中能确定唯一的 Java Class 对象。
311305
312- The following example shows the ` User ` schema used previously. But this time it contains a namespace
313- pointing to the location(in this case the package), where the ` User ` class for the reflection could be found.
306+ 下面的案例展示了上文中的 User 对象的 schema 信息。但是当前案例包含了一个指定文件目录的 namespace(当前案例下的包路径),反射过程中可以找到对应的 User 类。
314307
315308``` java
316309// avro schema with namespace
@@ -324,10 +317,9 @@ final String schema =
324317 + " {\" name\" : \" favoriteColor\" , \" type\" : [\" string\" , \" null\" ] }\n "
325318 + " ]\n "
326319 + " }" ;
327-
328320```
329321
330- Parquet files created with this schema will contain meta information like:
322+ 由上述 scheme 信息创建的 Parquet 文件包含以下 meta 信息:
331323
332324``` text
333325creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
@@ -349,45 +341,44 @@ favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAI
349341
350342```
351343
352- With the ` User ` class defined in the package org.apache.flink.formats.parquet.avro:
344+ 使用包 ` org.apache.flink.formats.parquet.avro ` 路径下已定义的 User 类:
353345
354346``` java
355347public class User {
356- private String name;
357- private Integer favoriteNumber;
358- private String favoriteColor;
348+ private String name;
349+ private Integer favoriteNumber;
350+ private String favoriteColor;
359351
360- public User () {}
352+ public User () {}
361353
362- public User (String name , Integer favoriteNumber , String favoriteColor ) {
363- this . name = name;
364- this . favoriteNumber = favoriteNumber;
365- this . favoriteColor = favoriteColor;
366- }
354+ public User (String name , Integer favoriteNumber , String favoriteColor ) {
355+ this . name = name;
356+ this . favoriteNumber = favoriteNumber;
357+ this . favoriteColor = favoriteColor;
358+ }
367359
368- public String getName () {
369- return name;
370- }
360+ public String getName () {
361+ return name;
362+ }
371363
372- public Integer getFavoriteNumber () {
373- return favoriteNumber;
374- }
364+ public Integer getFavoriteNumber () {
365+ return favoriteNumber;
366+ }
375367
376- public String getFavoriteColor () {
377- return favoriteColor;
378- }
368+ public String getFavoriteColor () {
369+ return favoriteColor;
379370 }
371+ }
380372
381373```
382374
383- you can write the following program to read Avro Reflect records of User type from parquet files:
375+ 你可以通过下面的程序读取类型为 User 的 Avro Reflect records:
384376
385377``` java
386378final FileSource<GenericRecord > source =
387379 FileSource . forRecordStreamFormat(
388380 AvroParquetReaders . forReflectRecord(User . class), /* Flink Path */ )
389381 .build();
390-
391382final StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment();
392383 env. enableCheckpointing(10L );
393384
0 commit comments