diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java index 6f5db3e2c2197..9600783b35a99 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java @@ -21,14 +21,16 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; /** - * key value schema record. + * Key value schema record. + * You can use this interface on Pulsar IO Sources in order to write KeyValue messages. */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface KVRecord extends Record { +public interface KVRecord extends Record> { Schema getKeySchema(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d761ccb108ec0..67e8eddd2d22b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -41,9 +41,12 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; @@ -342,7 +345,14 @@ private void sendOutputMessage(Record srcRecord, Object output) { Thread.currentThread().setContextClassLoader(functionClassLoader); } try { - this.sink.write(new SinkRecord<>(srcRecord, output)); + SinkRecord sinkRecord; + if (output instanceof KeyValue + && srcRecord.getSchema() instanceof KeyValueSchema) { + sinkRecord = new SinkKVRecord(srcRecord, (KeyValue) output); + } else { + sinkRecord = new SinkRecord<>(srcRecord, output); + } + this.sink.write(sinkRecord); } catch (Exception e) { log.info("Encountered exception in sink write: ", e); stats.incrSinkExceptions(e); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkKVRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkKVRecord.java new file mode 100644 index 0000000000000..d89f6d0920cf8 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkKVRecord.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.functions.api.KVRecord; +import org.apache.pulsar.functions.api.Record; + +public class SinkKVRecord extends SinkRecord> implements KVRecord { + + public SinkKVRecord(Record> sourceRecord, KeyValue value) { + super(sourceRecord, value); + } + + @Override + public Schema getKeySchema() { + return ((KeyValueSchema) this.getSchema()).getKeySchema(); + } + + @Override + public Schema getValueSchema() { + return ((KeyValueSchema) this.getSchema()).getValueSchema(); + } + + @Override + public KeyValueEncodingType getKeyValueEncodingType() { + return ((KeyValueSchema) this.getSchema()).getKeyValueEncodingType(); + } +} diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index eab0bca5d973f..8af0eadc8229d 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -170,7 +170,7 @@ public void close() { private static Optional RECORD_SEQUENCE = Optional.empty(); private static long FLUSH_TIMEOUT_MS = 2000; - public abstract class AbstractKafkaSourceRecord implements Record { + public abstract class AbstractKafkaSourceRecord implements Record { @Getter Optional key; @Getter @@ -196,7 +196,7 @@ public abstract class AbstractKafkaSourceRecord implements Record { } @Override - public Schema getSchema() { + public Schema getSchema() { return null; } diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 4daafb11216ec..44bc311ccefdb 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -208,7 +208,7 @@ public Schema getSchema() { return schema; } } - protected static class KeyValueKafkaRecord extends KafkaRecord implements KVRecord { + protected static class KeyValueKafkaRecord extends KafkaRecord> implements KVRecord { private final Schema keySchema; private final Schema valueSchema; diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestKeyValueSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestKeyValueSink.java new file mode 100644 index 0000000000000..892b75c7bcb6c --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestKeyValueSink.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.api.KVRecord; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Slf4j +public class TestKeyValueSink implements Sink> { + + @Override + public void open(Map config, SinkContext sourceContext) throws Exception { + } + + public void write(Record> record) { + log.info("write {} {} {}", record, record.getClass(), record.getSchema()); + if (!(record instanceof KVRecord)) { + throw new RuntimeException("Expected a KVRecord, but got a "+record.getClass()); + } + KVRecord kvRecord = (KVRecord) record; + if (kvRecord.getKeySchema().getSchemaInfo().getType() != SchemaType.STRING) { + throw new RuntimeException("Expected a String key schema but it was a "+kvRecord.getKeySchema()); + } + + if (kvRecord.getValueSchema().getSchemaInfo().getType() != SchemaType.INT32) { + throw new RuntimeException("Expected a Integer key schema but it was a "+kvRecord.getValueSchema()); + } + } + @Override + public void close() throws Exception { + + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarKeyValueSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarKeyValueSinkTest.java new file mode 100644 index 0000000000000..7537e681584d0 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarKeyValueSinkTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/** + * Test behaviour of simple sinks + */ +@Slf4j +public class PulsarKeyValueSinkTest extends PulsarStandaloneTestSuite { + + @Test(groups = {"sink"}) + public void testSourceProperty() throws Exception { + String outputTopicName = "test-kv-sink-input-" + randomName(8); + String sinkName = "test-kv-sink-" + randomName(8); + submitSinkConnector(sinkName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestKeyValueSink", JAVAJAR); + + // get sink info + getSinkInfoSuccess(sinkName); + + // get source status + getSinkStatus(sinkName); + + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + @Cleanup Producer> producer = client.newProducer(Schema.KeyValue(String.class, Integer.class)) + .topic(outputTopicName) + .create(); + + producer.send(new KeyValue<>("foo", 1234)); + producer.send(new KeyValue<>("foo", 1234)); + producer.send(new KeyValue<>("foo", 1234)); + producer.send(new KeyValue<>("foo", 1234)); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar > 0); + assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0); + assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0); + }); + } + + + // delete source + deleteSink(sinkName); + + getSinkInfoNotFound(sinkName); + } + + private void submitSinkConnector(String sinkName, + String inputTopicName, + String className, + String archive) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sinks", "create", + "--name", sinkName, + "-i", inputTopicName, + "--archive", archive, + "--classname", className + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands); + assertTrue( + result.getStdout().contains("\"Created successfully\""), + result.getStdout()); + } + + private void getSinkInfoSuccess(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\"")); + } + + private void getSinkStatus(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("\"running\" : true")); + } + + private void deleteSink(String sinkName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName + ); + assertTrue(result.getStdout().contains("successfully")); + result.assertNoStderr(); + } + + private void getSinkInfoNotFound(String sinkName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sinks", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sinkName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains(sinkName + " doesn't exist")); + } + } +} + diff --git a/tests/integration/src/test/resources/pulsar-function.xml b/tests/integration/src/test/resources/pulsar-function.xml index 8da058d26cc3c..919026edbdbee 100644 --- a/tests/integration/src/test/resources/pulsar-function.xml +++ b/tests/integration/src/test/resources/pulsar-function.xml @@ -25,6 +25,7 @@ +