Skip to content

Conversation

@dlg99
Copy link
Contributor

@dlg99 dlg99 commented Mar 6, 2021

Motivation

Provide a way to use Kafka-connect Sink as a pulsar sink, in cases like:

  • company has custom kafka sink and want to try the pulsar out
  • no corresponding pulsar sink exists
    etc.

Modifications

Added "kafka producer" that uses kafka sink to dump data to the 3rd system bypassing kafka.
Added configuration options (kafkaConnectorSinkClass, kafkaConnectorConfigProperties) for the pulsar-kafka sink
Split pulsar-io/kafka module into pulsar-io/kafka (builds jar) and pulsar-io/kafka-nar (builds nar)

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Added unit test.
Tested locally as

Ran pulsar standalone
Built pulsario/kafka-nar as mvn clean package -DskipTests -P packageKafkaConnect to include kafka's connect-file sink into the nar.
Ran test nar as

bin/pulsar-admin sinks localrun -a ~/pulsar-io-kafka-nar-2.8.0-SNAPSHOT.nar --name kwrap --namespace public/default/ktest --parallelism 1 -i my-topic --sink-config-file ~/sink.yaml

with

$ cat ~/sink.yaml
configs:
  "topic": "test"
  "offsetStorageTopic": "kafka-connect-sink-offset"
  "pulsarServiceUrl": "pulsar://localhost:6650/"
  "kafkaConnectorSinkClass": "org.apache.kafka.connect.file.FileStreamSinkConnector"
  "defaultKeySchema": "STRING_SCHEMA"
  "defaultValueSchema": "BYTES_SCHEMA"
  "kafkaConnectorConfigProperties":
    "linger.ms": "10000"
    "batch.size": "3"
    "file": "/tmp/sink_test.out"

message produced as

bin/pulsar-client produce my-topic --messages "hello-pulsar"

and got

$ cat /tmp/sink_test.out
[B@242e2d8b

which is perfect ([B@... is from FileSink writing byte array as a string, currently expected)
Schema support is needed in the pulsar's kafka sink to support non-byte[] data, tbd.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented yet)
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
    TBD following review

@sijie sijie requested review from jerrypeng and srkukarni March 8, 2021 03:05
@sijie sijie added this to the 2.8.0 milestone Mar 8, 2021
@sijie
Copy link
Member

sijie commented Mar 8, 2021

@jerrypeng @srkukarni @freeznet @nlu90 Can you review this pull request?

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlg99

I think you are putting the implementation in a wrong package. pulsar-io/kafka is used for pulsar source/sink that interacts with kafka.

The Kafka-connect-adapter is used for adopting Kafka Connect Sink/Source. We should put the Kafka sink connector adopter in pulsar-io/kafka-connect-adapter.


<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie I might be missing some context, what it can break? The nar file name/source path is changed.
Similar change is #9808 - am I missing some context that will help me understand why that one was safe and this one is not?
I relied on integration tests to catch potential breaks, please let me know if there is something the tests won't catch, how can I confirm that everything works fine in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed #9808. There is no breaking change here.

#9808 is fine because the adopter is used for both standalone connector and a dependency for debezium connector. This goes to my major comment. I don't think you should put the Kafka sink connector wrapper in Kafka connector. It should go to the pulsar-io-kafka-connect-adaptor module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie moved to another module


<build>
<plugins>
<plugin>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

.forEach(kv -> props.put(kv.getKey(), kv.getValue()));

producer = new KafkaProducer<>(beforeCreateProducer(props));
// todo: schemas from config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a Github issue and link the issue here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@Override
public KeyValue<Schema, Schema> extractKeyValueSchemas(Record<byte[]> record) {
Schema keySchema = Schema.STRING_SCHEMA;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why do you use STRING_SCHEMA, not BYTES_SCHEMA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie This is a type of original key (always String in pulsar).
KafkaProducer does serialization on send with configured serializers.
KafkaSinkWrappingProducer simply passes the key and value as it is to the sink, so the String key is passed as a String.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar already supports bytes through keyBytes. We should use that instead of using STRING.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method accepts Record<byte[]> where the keyBytes does not exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this is a preliminary PR, currently we are only supporting String->byte[]

The next step will be to fully support Schema and KeyValue

public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
sinkContext.throwIfNeeded();
task.put(Lists.newArrayList(toSinkRecord(producerRecord)));
return CompletableFuture.completedFuture(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to return a CompletableFuture that completes when the SinkRecord has been successfully processed ?

otherwise Pulsar thinks that the record has been correctly "sent" (processed) but this is not true.

the same comment applies to the other methods below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli I'll add flush() immediately after task.put() to guarantee that for now.
The proper way would be implementation of batching support (and policies based on time/number of pending records) but I'd prefer to postpone this as a "performance improvement" until we agree on functional completeness and/or hear some feedback that this is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added batch/linger support, should cover this.

@dlg99 dlg99 marked this pull request as draft March 8, 2021 23:24
@dlg99 dlg99 marked this pull request as ready for review March 11, 2021 00:13
@dlg99
Copy link
Contributor Author

dlg99 commented Mar 11, 2021

@sijie , @eolivelli - can you take another look please?

private final PulsarKafkaSinkContext sinkContext;
private final PulsarKafkaSinkTaskContext taskContext;
private final int batchSize;
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we give a name ?

this.batchSize = getBatchSize(props);

long lingerMs = getLingerMs(props);
scheduledExecutor.scheduleAtFixedRate(() -> this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not shutting down this scheduledExecutor
can you please shut it down in the close method ?

}

@Override
public List<PartitionInfo> partitionsFor(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to implement this method ?
if we are not calling it from our KafkaSink than we could simply throw UnsupportedOperationException

@Override
public Map<MetricName, ? extends Metric> metrics() {
sinkContext.throwIfNeeded();
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about returning an empty map or throwing UnsupportedOperationException ?

final Schema keySchema;
final Schema valueSchema;

public ProducerRecordWithSchema(String topic, Integer partition, Long timestamp, K key, V value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to implement all of these constructors ?
probably we need only one

return snapshot;
}

private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it uses topicNamespace. I can make it static and pass topicNamespace as a parameter but I don't see what it improves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I missed that

super.internalSetup();
super.producerBaseSetup();

file = Paths.get(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about Files.createTemporaryDirectory ?

@Override
protected void cleanup() throws Exception {
if (file != null) {
//Files.delete(file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please on comment this line

status.incrementAndGet();
} else {
System.out.println(exception.toString());
exception.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use logger ?

public void write(Record<byte[]> sourceRecord) {
KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue());
KeyValue<Schema, Schema> keyValueSchemas = extractKeyValueSchemas(sourceRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably using a KeyValue<Schema, Schema> here may be misleading, what about using a "Pair" ?

I mean, that usually KeyValue is the class we use ad content of the Record, here you are using it as a simple Pair, to return two objects from the extractKeyValueSchemas method

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sijie please take a look

@dlg99
Copy link
Contributor Author

dlg99 commented Mar 12, 2021

/pulsarbot run-failure-checks

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really confused by what this PR is trying to do. If the goal is wrapping the Kafka Sink as Pulsar Sink, I would expect to see a class called KafkaSinkConnector similar as what we did to wrap kafka source into KafkaSourceConnector (https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java).

I see many other different changes. i.e adding a new kafka-nar module (which is not necessary), or adding support to write records with schemas. I am lost in reviewing this pull request. Please clarify what you are going to do here.

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid importing *


private static long getLingerMs(Properties props) {
long lingerMs = 2147483647L; // as in kafka
final String lingerPropName = "linger.ms";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka has a constant for settings. We should use Kafka constants instead of defining those strings again.


@Override
public void raiseError(Exception e) {
log.warn("raiseError called", lastException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please improve the log statement. The log statement here provides unless information.

</parent>

<artifactId>pulsar-io-kafka-nar</artifactId>
<name>Pulsar IO :: Kafka NAR</name>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this change is unrelated to this PR. pulsar-io-kafka is a connector which is not a library shared by other modules. introducing a new module will increase the build time. Hence I will suggest removing this change.

public void write(Record<byte[]> sourceRecord) {
KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue());
Pair<Schema, Schema> keyValueSchemas = extractKeyValueSchemas(sourceRecord);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is related to the adopter change. Can you explain why do you put the change here?

@dlg99
Copy link
Contributor Author

dlg99 commented Mar 15, 2021

@sijie The goal is using Kafka Connect Sink in Pusar Sink. There is already Pulsar's KafkaBytesSink (in pulsar-io/kafka/ module) that writes data to kafka.
Instead of adding another Sink and later dealing with potentially duplicate changes related to i.e. Schema, the change wraps Kafka Connect Sink into the kafka producer API and makes existing Pulsar's Sink use it.
Kafka Connect Sink accepts (through the SinkRecord) and uses Schemas to send data to the third-party system. there is no need to serialize data before
Schemas also will be used after @eolivelli's changes unblock implementation of Sink<Object>.

the nar module split does not affect anything functionally but

  • avoids building nar file when I simply want build module / run tests (mvn clean install)
  • simplifies dependencies definition in added profile. There is a default profile (as existing) and the one that specifies Kafka Connect module to include into nar.

@dlg99
Copy link
Contributor Author

dlg99 commented Mar 16, 2021

Abandoned in lieu of #9927

@dlg99 dlg99 closed this Mar 16, 2021
@dlg99 dlg99 deleted the kafka-connect branch October 14, 2021 23:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants