Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions docs/layouts/shortcodes/generated/pulsar_table_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>admin-url</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Pulsar service HTTP URL for the admin endpoint. For example, <code class="highlighter-rouge">http://my-broker.example.com:8080</code>, or <code class="highlighter-rouge">https://my-broker.example.com:8443</code> for TLS.</td>
</tr>
<tr>
<td><h5>explicit</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicate if the table is an explicit Flink table.</td>
</tr>
<tr>
<td><h5>key.fields</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined.</td>
</tr>
<tr>
<td><h5>key.format</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the format identifier for decoding/encoding key bytes in Pulsar message. The identifier is used to discover a suitable format factory.</td>
</tr>
<tr>
<td><h5>service-url</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Service URL provider for Pulsar service.<br />To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.<br />You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.<br /><ul><li>This is an example of <code class="highlighter-rouge">localhost</code>: <code class="highlighter-rouge">pulsar://localhost:6650</code>.</li><li>If you have multiple brokers, the URL is as: <code class="highlighter-rouge">pulsar://localhost:6550,localhost:6651,localhost:6652</code></li><li>A URL for a production Pulsar cluster is as: <code class="highlighter-rouge">pulsar://pulsar.us-west.example.com:6650</code></li><li>If you use TLS authentication, the URL is as <code class="highlighter-rouge">pulsar+ssl://pulsar.us-west.example.com:6651</code></li></ul></td>
</tr>
<tr>
<td><h5>sink.custom-topic-router</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>(Optional) the custom topic router class URL that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink). If this option is provided, the <code class="highlighter-rouge">sink.topic-routing-mode</code> option will be ignored.</td>
</tr>
<tr>
<td><h5>sink.message-delay-interval</h5></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>(Optional) the message delay delivery interval that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).</td>
</tr>
<tr>
<td><h5>sink.topic-routing-mode</h5></td>
<td style="word-wrap: break-word;">round-robin</td>
<td><p>Enum</p></td>
<td>(Optional) the topic routing mode. Available options are <code class="highlighter-rouge">round-robin</code> and <code class="highlighter-rouge">message-key-hash</code>. By default, it is set to <code class="highlighter-rouge">round-robin</code>. If you want to use a custom topic router, use the <code class="highlighter-rouge">sink.custom-topic-router</code> option to determine the partition for a particular message.<br /><br />Possible values:<ul><li>"round-robin": The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it's set to the same boundary of <code class="highlighter-rouge">pulsar.producer.batchingMaxMessages</code>, to ensure batching is effective.</li><li>"message-key-hash": If no key is provided, The partitioned producer will randomly pick one single topic partition and publish all the messages into that partition. If a key is provided on the message, the partitioned producer will hash the key and assign the message to a particular partition.</li><li>"custom": Use custom <code class="highlighter-rouge">TopicRouter</code> implementation that will be called to determine the partition for a particular message.</li></ul></td>
</tr>
<tr>
<td><h5>source.start.message-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional message id used to specify a consuming starting point for source. Use <code class="highlighter-rouge">earliest</code>, <code class="highlighter-rouge">latest</code> or pass in a message id representation in <code class="highlighter-rouge">ledgerId:entryId:partitionId</code>, such as <code class="highlighter-rouge">12:2:-1</code></td>
</tr>
<tr>
<td><h5>source.start.publish-time</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>(Optional) the publish timestamp that is used to specify a starting point for the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source) to consume data.</td>
</tr>
<tr>
<td><h5>source.stop.after-message-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1". </td>
</tr>
<tr>
<td><h5>source.stop.at-message-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional message id used to specify a stop cursor for the unbounded sql source. Use "never", "latest" or pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1"</td>
</tr>
<tr>
<td><h5>source.stop.at-publish-time</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional publish timestamp used to specify a stop cursor for the unbounded sql source.</td>
</tr>
<tr>
<td><h5>source.subscription-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer.</td>
</tr>
<tr>
<td><h5>source.subscription-type</h5></td>
<td style="word-wrap: break-word;">Exclusive</td>
<td><p>Enum</p></td>
<td>The [subscription type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions) that is supported by the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). Currently, only <code class="highlighter-rouge">Exclusive</code> and <code class="highlighter-rouge">Shared</code> subscription types are supported.<br /><br />Possible values:<ul><li>"Exclusive"</li><li>"Shared"</li><li>"Failover"</li><li>"Key_Shared"</li></ul></td>
</tr>
<tr>
<td><h5>topics</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (<code class="highlighter-rouge">;</code>) like <code class="highlighter-rouge">topic-1;topic-2</code>.</td>
</tr>
<tr>
<td><h5>value.format</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the format identifier for decoding/encoding value data. The identifier is used to discover a suitable format factory.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,11 @@ org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not sa
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.pulsar.table.PulsarTableITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
98 changes: 98 additions & 0 deletions flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ under the License.
<version>${project.version}</version>
</dependency>

<!-- Table ecosystem -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Connectors -->

<dependency>
Expand All @@ -74,6 +104,22 @@ under the License.
<optional>true</optional>
</dependency>

<!-- Format support -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Tests -->

<dependency>
Expand All @@ -99,6 +145,50 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Pulsar Table Test Dependencies-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Pulsar SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar testing environment -->

<dependency>
Expand Down Expand Up @@ -221,6 +311,14 @@ under the License.
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- We pick an arbitrary version of net.java.dev.jna:jna to satisfy dependency
convergence for org.testcontainers:testcontainers which transitively depends on
two different versions.-->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.5.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.connector.pulsar.common.schema.factories;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.AvroUtils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
Expand Down Expand Up @@ -45,4 +48,14 @@ public Schema<T> createSchema(SchemaInfo info) {

return AvroSchema.of(definition);
}

@Override
public TypeInformation<T> createTypeInfo(SchemaInfo info) {
try {
Class<T> decodeClassInfo = decodeClassInfo(info);
return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
} catch (Exception e) {
return super.createTypeInfo(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.connector.pulsar.common.schema.factories;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.AvroUtils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -38,4 +41,14 @@ public Schema<T> createSchema(SchemaInfo info) {
Class<T> typeClass = decodeClassInfo(info);
return JSONSchema.of(typeClass, info.getProperties());
}

@Override
public TypeInformation<T> createTypeInfo(SchemaInfo info) {
try {
Class<T> decodeClassInfo = decodeClassInfo(info);
return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
} catch (Exception e) {
return super.createTypeInfo(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ public Schema<KeyValue<K, V>> createSchema(SchemaInfo info) {
public TypeInformation<KeyValue<K, V>> createTypeInfo(SchemaInfo info) {
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = decodeKeyValueSchemaInfo(info);

Schema<K> keySchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getKey());
Class<K> keyClass = decodeClassInfo(keySchema.getSchemaInfo());

Schema<V> valueSchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getValue());
Class<V> valueClass = decodeClassInfo(valueSchema.getSchemaInfo());
Class<K> keyClass = decodeClassInfo(kvSchemaInfo.getKey());
Class<V> valueClass = decodeClassInfo(kvSchemaInfo.getValue());

Schema<KeyValue<K, V>> schema = createSchema(info);
PulsarSchema<KeyValue<K, V>> pulsarSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -133,9 +134,16 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);

// Choose the right topic to send.
String key = message.getKey();

List<String> availableTopics = metadataListener.availableTopics();
String topic = topicRouter.route(element, key, availableTopics, sinkContext);
String keyString;
// TODO if both keyBytes and key are set, use keyBytes. This is a temporary solution.
if (message.getKeyBytes() == null) {
keyString = message.getKey();
} else {
keyString = Base64.getEncoder().encodeToString(message.getKeyBytes());
}
String topic = topicRouter.route(element, keyString, availableTopics, sinkContext);

// Create message builder for sending message.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
Expand Down Expand Up @@ -206,6 +214,11 @@ private TypedMessageBuilder<?> createMessageBuilder(
builder.key(key);
}

byte[] keyBytes = message.getKeyBytes();
if (keyBytes != null) {
builder.keyBytes(keyBytes);
}

long eventTime = message.getEventTime();
if (eventTime > 0) {
builder.eventTime(eventTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PulsarMessage<T> {

@Nullable private final byte[] orderingKey;
@Nullable private final String key;
@Nullable private final byte[] keyBytes;
private final long eventTime;
private final Schema<T> schema;
@Nullable private final T value;
Expand All @@ -49,6 +50,7 @@ public class PulsarMessage<T> {
PulsarMessage(
@Nullable byte[] orderingKey,
@Nullable String key,
@Nullable byte[] keyBytes,
long eventTime,
Schema<T> schema,
@Nullable T value,
Expand All @@ -58,6 +60,7 @@ public class PulsarMessage<T> {
boolean disableReplication) {
this.orderingKey = orderingKey;
this.key = key;
this.keyBytes = keyBytes;
this.eventTime = eventTime;
this.schema = schema;
this.value = value;
Expand All @@ -77,6 +80,11 @@ public String getKey() {
return key;
}

@Nullable
public byte[] getKeyBytes() {
return keyBytes;
}

public long getEventTime() {
return eventTime;
}
Expand Down
Loading