Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

/**
* key value schema record.
* Key value schema record.
* You can use this interface on Pulsar IO Sources in order to write KeyValue messages.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface KVRecord<K, V> extends Record {
public interface KVRecord<K, V> extends Record<KeyValue<K,V>> {

Schema<K> getKeySchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -342,7 +345,14 @@ private void sendOutputMessage(Record srcRecord, Object output) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
SinkRecord sinkRecord;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why do you need this change here. What is the value for adding a SinkKVRecord? SinkRecord is a wrapper of the output object. The output object can be a KeyValue object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie
these are the key points:

  1. on the Source side you use KVRecord for records with a KeyValue payload, this lets you specify the keySchema and the valueSchema, for consistency it is good to have the same situation on the Sink side
  2. KeyValueSchema is not part of the public api (it is in the impl package), so in theory you cannot get the keySchema and the valueSchema when you are inside a Sink, if we implement KVRecord then you have those schemas
  3. When we have AutoConsumeSchema that returns a KeyValue object the record.getSchema() is the internal class AutoConsumeSchema, so you cannot still get the keySchema and the valueSchema (this problem is still not present, because we have not merged the Sink<GenericObject patch yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie the alternative is:

  • make KeyValueSchema part of the "public" API (or add an interface on the "api" package)
  • hide "AutoConsumeSchema" in the Sink side and make Record#getSchema() return the current KeyValueSchema

but this alternative is not good because:

  • we are going to add a new API to represent KeyValueSchema, but we already have KVRecord
  • we are changing the behaviour of the Sink, not returning AutoConsumeSchema anymore

if (output instanceof KeyValue
&& srcRecord.getSchema() instanceof KeyValueSchema) {
sinkRecord = new SinkKVRecord(srcRecord, (KeyValue) output);
} else {
sinkRecord = new SinkRecord<>(srcRecord, output);
}
this.sink.write(sinkRecord);
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;

public class SinkKVRecord<K,V> extends SinkRecord<KeyValue<K,V>> implements KVRecord<K,V> {

public SinkKVRecord(Record<KeyValue<K, V>> sourceRecord, KeyValue<K, V> value) {
super(sourceRecord, value);
}

@Override
public Schema<K> getKeySchema() {
return ((KeyValueSchema) this.getSchema()).getKeySchema();
}

@Override
public Schema<V> getValueSchema() {
return ((KeyValueSchema) this.getSchema()).getValueSchema();
}

@Override
public KeyValueEncodingType getKeyValueEncodingType() {
return ((KeyValueSchema) this.getSchema()).getKeyValueEncodingType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void close() {
private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
private static long FLUSH_TIMEOUT_MS = 2000;

public abstract class AbstractKafkaSourceRecord<T> implements Record {
public abstract class AbstractKafkaSourceRecord<T> implements Record<T> {
@Getter
Optional<String> key;
@Getter
Expand All @@ -196,7 +196,7 @@ public abstract class AbstractKafkaSourceRecord<T> implements Record {
}

@Override
public Schema getSchema() {
public Schema<T> getSchema() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public Schema<V> getSchema() {
return schema;
}
}
protected static class KeyValueKafkaRecord<V> extends KafkaRecord implements KVRecord<Object, Object> {
protected static class KeyValueKafkaRecord<V> extends KafkaRecord<KeyValue<Object, Object>> implements KVRecord<Object, Object> {

private final Schema<Object> keySchema;
private final Schema<Object> valueSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Slf4j
public class TestKeyValueSink implements Sink<KeyValue<String, Integer>> {

@Override
public void open(Map<String, Object> config, SinkContext sourceContext) throws Exception {
}

public void write(Record<KeyValue<String, Integer>> record) {
log.info("write {} {} {}", record, record.getClass(), record.getSchema());
if (!(record instanceof KVRecord)) {
throw new RuntimeException("Expected a KVRecord, but got a "+record.getClass());
}
KVRecord<String, Integer> kvRecord = (KVRecord<String, Integer>) record;
if (kvRecord.getKeySchema().getSchemaInfo().getType() != SchemaType.STRING) {
throw new RuntimeException("Expected a String key schema but it was a "+kvRecord.getKeySchema());
}

if (kvRecord.getValueSchema().getSchemaInfo().getType() != SchemaType.INT32) {
throw new RuntimeException("Expected a Integer key schema but it was a "+kvRecord.getValueSchema());
}
}
@Override
public void close() throws Exception {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Test behaviour of simple sinks
*/
@Slf4j
public class PulsarKeyValueSinkTest extends PulsarStandaloneTestSuite {

@Test(groups = {"sink"})
public void testSourceProperty() throws Exception {
String outputTopicName = "test-kv-sink-input-" + randomName(8);
String sinkName = "test-kv-sink-" + randomName(8);
submitSinkConnector(sinkName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestKeyValueSink", JAVAJAR);

// get sink info
getSinkInfoSuccess(sinkName);

// get source status
getSinkStatus(sinkName);

@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(container.getPlainTextServiceUrl())
.build();
@Cleanup Producer<KeyValue<String, Integer>> producer = client.newProducer(Schema.KeyValue(String.class, Integer.class))
.topic(outputTopicName)
.create();

producer.send(new KeyValue<>("foo", 1234));
producer.send(new KeyValue<>("foo", 1234));
producer.send(new KeyValue<>("foo", 1234));
producer.send(new KeyValue<>("foo", 1234));

try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {

Awaitility.await().ignoreExceptions().untilAsserted(() -> {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar > 0);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
});
}


// delete source
deleteSink(sinkName);

getSinkInfoNotFound(sinkName);
}

private void submitSinkConnector(String sinkName,
String inputTopicName,
String className,
String archive) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sinks", "create",
"--name", sinkName,
"-i", inputTopicName,
"--archive", archive,
"--classname", className
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = container.execCmd(commands);
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
}

private void getSinkInfoSuccess(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
}

private void getSinkStatus(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("\"running\" : true"));
}

private void deleteSink(String sinkName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName
);
assertTrue(result.getStdout().contains("successfully"));
result.assertNoStderr();
}

private void getSinkInfoNotFound(String sinkName) throws Exception {
try {
container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sinks",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sinkName);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
assertTrue(e.getResult().getStderr().contains(sinkName + " doesn't exist"));
}
}
}

1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-function.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<class name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
<class name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
<class name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest"/>
<class name="org.apache.pulsar.tests.integration.io.PulsarKeyValueSinkTest"/>
</classes>
</test>
</suite>