Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 106 additions & 115 deletions docs/content.zh/docs/connectors/datastream/formats/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ under the License.
-->


<a name="parquet-format"></a>

# Parquet format

Flink supports reading [Parquet](https://parquet.apache.org/) files,
producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
To use the format you need to add the `flink-parquet` dependency to your project:
Flink 支持读取 [Parquet](https://parquet.apache.org/) 文件并生成 {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} 和 [Avro](https://avro.apache.org/) 记录。
要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:

```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>{{< version >}}</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>{{< version >}}</version>
</dependency>
```

To read Avro records, you will need to add the `parquet-avro` dependency:
要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:

```xml
<dependency>
Expand All @@ -61,146 +62,141 @@ To read Avro records, you will need to add the `parquet-avro` dependency:
</dependency>
```

This format is compatible with the new Source that can be used in both batch and streaming execution modes.
Thus, you can use this format for two kinds of data:
此格式与新的 Source 兼容,可以同时在批和流模式下使用。
因此,你可使用此格式处理以下两类数据:

- Bounded data: lists all files and reads them all.
- Unbounded data: monitors a directory for new files that appear.
- 有界数据: 列出所有文件并全部读取。
- 无界数据:监控目录中出现的新文件

{{< hint info >}}
When you start a File Source it is configured for bounded data by default.
To configure the File Source for unbounded data, you must additionally call
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
当你开启一个 File Source,会被默认为有界读取。
如果你想在连续读取模式下使用 File Source,你必须额外调用
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`
{{< /hint >}}

**Vectorized reader**

```java

// Parquet rows are decoded in batches
FileSource.forBulkFileFormat(BulkFormat,Path...)

// Monitor the Paths to read data as unbounded data
FileSource.forBulkFileFormat(BulkFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();

.monitorContinuously(Duration.ofMillis(5L))
.build();
```

**Avro Parquet reader**

```java

// Parquet rows are decoded in batches
FileSource.forRecordStreamFormat(StreamFormat,Path...)

// Monitor the Paths to read data as unbounded data
FileSource.forRecordStreamFormat(StreamFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();


```

{{< hint info >}}
Following examples are all configured for bounded data.
To configure the File Source for unbounded data, you must additionally call
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
下面的案例都是基于有界数据的。
如果你想在连续读取模式下使用 File Source,你必须额外调用
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`
{{< /hint >}}

<a name="flink-rowdata"></a>

## Flink RowData

In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").
Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
There is no watermark strategy defined as records do not contain event timestamps.
在此示例中,你将创建由 Parquet 格式的记录构成的 Flink RowDatas DataStream。我们把 schema 信息映射为只读字段("f7""f4" "f99")。
每个批次读取 500 条记录。其中,第一个布尔类型的参数用来指定是否需要将时间戳列处理为 UTC
第二个布尔类型参数用来指定在进行 Parquet 字段映射时,是否要区分大小写。
这里不需要水印策略,因为记录中不包含事件时间戳。

```java
final LogicalType[] fieldTypes =
new LogicalType[] {
new DoubleType(), new IntType(), new VarCharType()
};
new LogicalType[] {
new DoubleType(), new IntType(), new VarCharType()
};

final ParquetColumnarRowInputFormat<FileSourceSplit> format =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
500,
false,
true);
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
500,
false,
true);
final FileSource<RowData> source =
FileSource.forBulkFileFormat(format, /* Flink Path */)
.build();
FileSource.forBulkFileFormat(format, /* Flink Path */)
.build();
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```

## Avro Records

Flink supports producing three types of Avro records by reading Parquet files:
Flink 支持三种方式来读取 Parquet 文件并创建 Avro records

- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
- [Reflect record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)

### Generic record

Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html).
This example uses an Avro schema example similar to the one described in the [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
使用 JSON 定义 Avro schemas。你可以从 [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html) 获取更多关于 Avro schemas 和类型的信息。
此示例使用了一个在 [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) 中描述的示例相似的 Avro schema:

```json lines
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
```
这个 schema 定义了一个具有三个属性的的 user 记录:name,favoriteNumber 和 favoriteColor。你可以
在 [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) 找到更多关于如何定义 Avro schema 的详细信息。

This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema.

In the following example, you will create a DataStream containing Parquet records as Avro Generic records.
It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details.
After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records.
在此示例中,你将创建包含由 Avro Generic records 格式构成的 Parquet records 的 DataStream。
Flink 会基于 JSON 字符串解析 Avro schema。也有很多其他的方式解析 schema,例如基于 java.io.File 或 java.io.InputStream。
请参考 [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) 以获取更多详细信息。
然后,你可以通过 `AvroParquetReaders` 为 Avro Generic 记录创建 `AvroParquetRecordFormat`。

```java
// parsing avro schema
// 解析 avro schema
final Schema schema =
new Schema.Parser()
.parse(
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
+ " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+ " ]\n"
+ " }");
.parse(
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
+ " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+ " ]\n"
+ " }");

final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
.build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10L);

final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```

### Specific record

Based on the previously defined schema, you can generate classes by leveraging Avro code generation.
Once the classes have been generated, there is no need to use the schema directly in your programs.
You can either use `avro-tools.jar` to generate code manually or you could use the Avro Maven plugin to perform
code generation on any .avsc files present in the configured source directory. Please refer to
[Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information.
基于之前定义的 schema,你可以通过利用 Avro 代码生成来生成类。
一旦生成了类,就不需要在程序中直接使用 schema。
你可以使用 `avro-tools.jar` 手动生成代码,也可以直接使用 Avro Maven 插件对配置的源目录中的任何 .avsc 文件执行代码生成。
请参考 [Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) 获取更多信息。

The following example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
此示例使用了样例 schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc)

```json lines
[
Expand All @@ -218,17 +214,17 @@ The following example uses the example schema [testdata.avsc](https://github.com
]
```

You will use the Avro Maven plugin to generate the `Address` Java class:
你可以使用 Avro Maven plugin 生成 `Address` Java 类。

```java
@org.apache.avro.specific.AvroGenerated
public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
// generated code...
// 生成的代码...
}
```

You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Specific record
and then create a DataStream containing Parquet records as Avro Specific records.
你可以通过 `AvroParquetReaders` Avro Specific 记录创建 `AvroParquetRecordFormat`,
然后创建一个包含由 Avro Specific records 格式构成的 Parquet records 的 DateStream。

```java
final FileSource<GenericRecord> source =
Expand All @@ -245,12 +241,11 @@ final DataStream<GenericRecord> stream =

### Reflect record

Beyond Avro Generic and Specific record that requires a predefined Avro schema,
Flink also supports creating a DataStream from Parquet files based on existing Java POJO classes.
In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes.
Java types are mapped to Avro schemas, please refer to the [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation for more details.
除了需要预定义 Avro Generic 和 Specific 记录, Flink 还支持基于现有 Java POJO 类从 Parquet 文件创建 DateStream。
在这种场景中,Avro 会使用 Java 反射为这些 POJO 类生成 schema 和协议。
请参考 [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) 文档获取更多关于 Java 类型到 Avro schemas 映射的详细信息。

This example uses a simple Java POJO class [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
本例使用了一个简单的 Java POJO [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java)

```java
public class Datum implements Serializable {
Expand Down Expand Up @@ -287,8 +282,8 @@ public class Datum implements Serializable {
}
```

You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Reflect record
and then create a DataStream containing Parquet records as Avro Reflect records.
你可以通过 `AvroParquetReaders` Avro Reflect 记录创建一个 `AvroParquetRecordFormat`,
然后创建一个包含由 Avro Reflect records 格式构成的 Parquet records 的 DateStream。

```java
final FileSource<GenericRecord> source =
Expand All @@ -303,14 +298,12 @@ final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```

#### Prerequisite for Parquet files
### 使用 Parquet files 必备条件

In order to support reading Avro reflect records, the Parquet file must contain specific meta information.
The Avro schema used for creating the Parquet data must contain a `namespace`,
which will be used by the program to identify the concrete Java class for the reflection process.
为了支持读取 Avro Reflect 数据,Parquet 文件必须包含特定的 meta 信息。为了生成 Parquet 数据,Avro schema 信息中必须包含 namespace,
以便让程序在反射执行过程中能确定唯一的 Java Class 对象。

The following example shows the `User` schema used previously. But this time it contains a namespace
pointing to the location(in this case the package), where the `User` class for the reflection could be found.
下面的案例展示了上文中的 User 对象的 schema 信息。但是当前案例包含了一个指定文件目录的 namespace(当前案例下的包路径),反射过程中可以找到对应的 User 类。

```java
// avro schema with namespace
Expand All @@ -324,10 +317,9 @@ final String schema =
+ " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+ " ]\n"
+ " }";

```

Parquet files created with this schema will contain meta information like:
由上述 scheme 信息创建的 Parquet 文件包含以下 meta 信息:

```text
creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
Expand All @@ -349,45 +341,44 @@ favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAI

```

With the `User` class defined in the package org.apache.flink.formats.parquet.avro:
使用包 `org.apache.flink.formats.parquet.avro` 路径下已定义的 User 类:

```java
public class User {
private String name;
private Integer favoriteNumber;
private String favoriteColor;
private String name;
private Integer favoriteNumber;
private String favoriteColor;

public User() {}
public User() {}

public User(String name, Integer favoriteNumber, String favoriteColor) {
this.name = name;
this.favoriteNumber = favoriteNumber;
this.favoriteColor = favoriteColor;
}
public User(String name, Integer favoriteNumber, String favoriteColor) {
this.name = name;
this.favoriteNumber = favoriteNumber;
this.favoriteColor = favoriteColor;
}

public String getName() {
return name;
}
public String getName() {
return name;
}

public Integer getFavoriteNumber() {
return favoriteNumber;
}
public Integer getFavoriteNumber() {
return favoriteNumber;
}

public String getFavoriteColor() {
return favoriteColor;
}
public String getFavoriteColor() {
return favoriteColor;
}
}

```

you can write the following program to read Avro Reflect records of User type from parquet files:
你可以通过下面的程序读取类型为 User 的 Avro Reflect records

```java
final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */)
.build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10L);

Expand Down