Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/run-pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ jobs:
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
with:
maven-version: 3.6.2
- name: Run Unit Test and Install
run: |
./mvnw -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' \
mvn -ntp -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' \
clean install

13 changes: 13 additions & 0 deletions docs/content.zh/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,19 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
```

如果使用 KeyValue 或者 Struct 类型的Schema, 那么 pulsar `Schema` 讲不会含有类型类信息, 但 `PulsarSchemaTypeInformation` 需要通过传入类型类信息来构造。因此我们提供的API支持用户传入类型类信息。

例子如下:

```java
// Primitive 类型: 不需要提供类型信息
PulsarDeserializationSchema.pulsarSchema(Schema.INT32);
// Struct 类型 (JSON, Protobuf, Avro, 等等.)
PulsarDeserializationSchema.pulsarSchema(Schema.AVRO(SomeClass), SomeClass.class);
// KeyValue 类型
PulsarDeserializationSchema.pulsarSchema(Schema.KeyValue(Schema.INT32, Schema.AVRO(SomeClass)), Integer.class, SomeClass.class);
```

Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。例如,消息的 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。

如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。并一定要确保 `PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 是正确的结果。Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
Expand Down
15 changes: 15 additions & 0 deletions docs/content/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provi
```java
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
```
If using KeyValue type or Struct types, the pulsar `Schema` does not contain type class info which is
needed by `PulsarSchemaTypeInformation`. So the two APIs provides 2 parameter to pass the type info.

A example would be:

```java
// Primitive types: do not need to provide type class info
PulsarDeserializationSchema.pulsarSchema(Schema.INT32);

// Struct types (JSON, Protobuf, Avro, etc.)
PulsarDeserializationSchema.pulsarSchema(Schema.AVRO(SomeClass), SomeClass.class);

// KeyValue type
PulsarDeserializationSchema.pulsarSchema(Schema.KeyValue(Schema.INT32, Schema.AVRO(SomeClass)), Integer.class, SomeClass.class);
```

Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
such as message key, message publish time, message time, and application-defined key/value pairs etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>Boolean</td>
<td>Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to <code class="highlighter-rouge">true</code>.<br />The source would use pulsar client's internal mechanism and commit cursor in two ways.<ul><li>For <code class="highlighter-rouge">Key_Shared</code> and <code class="highlighter-rouge">Shared</code> subscription, the cursor would be committed once the message is consumed.</li><li>For <code class="highlighter-rouge">Exclusive</code> and <code class="highlighter-rouge">Failover</code> subscription, the cursor would be committed in a given interval.</li></ul></td>
</tr>
<tr>
<td><h5>pulsar.source.enableSchemaEvolution</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If you enable this option, we would consume and deserialize the message by using Pulsar's <code class="highlighter-rouge">Schema</code>.</td>
</tr>
<tr>
<td><h5>pulsar.source.maxFetchRecords</h5></td>
<td style="word-wrap: break-word;">100</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#
#Thu May 12 11:35:25 CST 2022
Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=48c7dd05-c840-4ac4-a3ba-919e07191450
ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=a2ce237e-b050-4ba0-8748-d83637a207a8
85 changes: 84 additions & 1 deletion flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ under the License.
<version>${project.version}</version>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- Connectors -->

<dependency>
Expand Down Expand Up @@ -108,6 +136,62 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Pulsar Table Test Dependencies-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Pulsar SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar testing environment -->

<dependency>
Expand Down Expand Up @@ -236,7 +320,6 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.connector.pulsar.common.schema.factories;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.AvroUtils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
Expand Down Expand Up @@ -45,4 +48,14 @@ public Schema<T> createSchema(SchemaInfo info) {

return AvroSchema.of(definition);
}

@Override
public TypeInformation<T> createTypeInfo(SchemaInfo info) {
try {
Class<T> decodeClassInfo = decodeClassInfo(info);
return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
} catch (Exception e) {
return super.createTypeInfo(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.connector.pulsar.common.schema.factories;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.AvroUtils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -38,4 +41,14 @@ public Schema<T> createSchema(SchemaInfo info) {
Class<T> typeClass = decodeClassInfo(info);
return JSONSchema.of(typeClass, info.getProperties());
}

@Override
public TypeInformation<T> createTypeInfo(SchemaInfo info) {
try {
Class<T> decodeClassInfo = decodeClassInfo(info);
return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
} catch (Exception e) {
return super.createTypeInfo(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ public Schema<KeyValue<K, V>> createSchema(SchemaInfo info) {
public TypeInformation<KeyValue<K, V>> createTypeInfo(SchemaInfo info) {
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = decodeKeyValueSchemaInfo(info);

Schema<K> keySchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getKey());
Class<K> keyClass = decodeClassInfo(keySchema.getSchemaInfo());

Schema<V> valueSchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getValue());
Class<V> valueClass = decodeClassInfo(valueSchema.getSchemaInfo());
Class<K> keyClass = decodeClassInfo(kvSchemaInfo.getKey());
Class<V> valueClass = decodeClassInfo(kvSchemaInfo.getValue());

Schema<KeyValue<K, V>> schema = createSchema(info);
PulsarSchema<KeyValue<K, V>> pulsarSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -133,9 +134,16 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);

// Choose the right topic to send.
String key = message.getKey();

List<String> availableTopics = metadataListener.availableTopics();
String topic = topicRouter.route(element, key, availableTopics, sinkContext);
String keyString;
// TODO if both keyBytes and key are set, use keyBytes. This is a temporary solution.
if (message.getKeyBytes() == null) {
keyString = message.getKey();
} else {
keyString = Base64.getEncoder().encodeToString(message.getKeyBytes());
}
String topic = topicRouter.route(element, keyString, availableTopics, sinkContext);

// Create message builder for sending message.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
Expand Down Expand Up @@ -206,6 +214,11 @@ private TypedMessageBuilder<?> createMessageBuilder(
builder.key(key);
}

byte[] keyBytes = message.getKeyBytes();
if (keyBytes != null) {
builder.keyBytes(keyBytes);
}

long eventTime = message.getEventTime();
if (eventTime > 0) {
builder.eventTime(eventTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PulsarMessage<T> {

@Nullable private final byte[] orderingKey;
@Nullable private final String key;
@Nullable private final byte[] keyBytes;
private final long eventTime;
private final Schema<T> schema;
@Nullable private final T value;
Expand All @@ -49,6 +50,7 @@ public class PulsarMessage<T> {
PulsarMessage(
@Nullable byte[] orderingKey,
@Nullable String key,
@Nullable byte[] keyBytes,
long eventTime,
Schema<T> schema,
@Nullable T value,
Expand All @@ -58,6 +60,7 @@ public class PulsarMessage<T> {
boolean disableReplication) {
this.orderingKey = orderingKey;
this.key = key;
this.keyBytes = keyBytes;
this.eventTime = eventTime;
this.schema = schema;
this.value = value;
Expand All @@ -77,6 +80,11 @@ public String getKey() {
return key;
}

@Nullable
public byte[] getKeyBytes() {
return keyBytes;
}

public long getEventTime() {
return eventTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class PulsarMessageBuilder<T> {

private byte[] orderingKey;
private String key;
private byte[] keyBytes;
private long eventTime;
Schema<T> schema;
private T value;
Expand All @@ -59,6 +60,15 @@ public PulsarMessageBuilder<T> key(String key) {
return null;
}

/**
* Property {@link TypedMessageBuilder#keyBytes(byte[])}. This property would also be used in
* {@link KeyHashTopicRouter}.
*/
public PulsarMessageBuilder<T> keyBytes(byte[] keyBytes) {
this.keyBytes = checkNotNull(keyBytes);
return null;
}

/** Method wrapper of {@link TypedMessageBuilder#eventTime(long)}. */
public PulsarMessageBuilder<T> eventTime(long eventTime) {
this.eventTime = eventTime;
Expand Down Expand Up @@ -116,6 +126,7 @@ public PulsarMessage<T> build() {
return new PulsarMessage<>(
orderingKey,
key,
keyBytes,
eventTime,
schema,
value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class PulsarSource<OUT>
* The constructor for PulsarSource, it's package protected for forcing using {@link
* PulsarSourceBuilder}.
*/
PulsarSource(
public PulsarSource(
SourceConfiguration sourceConfiguration,
PulsarSubscriber subscriber,
RangeGenerator rangeGenerator,
Expand Down
Loading