From 6445254309561d7957516b18099241bf18b5b7a3 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Sun, 19 May 2019 23:39:19 -0700 Subject: [PATCH 1/4] Support Pulsar Message as function input --- .../functions/instance/JavaInstance.java | 13 +++++-- .../instance/JavaInstanceRunnable.java | 10 ++++-- .../pulsar/functions/source/PulsarRecord.java | 1 + .../functions/instance/JavaInstanceTest.java | 4 +-- .../functions/utils/FunctionCommon.java | 36 +++++++++++++++++++ 5 files changed, 58 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 8aee702a9fd49..cba1106b55425 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -24,6 +24,7 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.source.PulsarRecord; import java.util.Map; @@ -39,8 +40,9 @@ public class JavaInstance implements AutoCloseable { private final ContextImpl context; private Function function; private java.util.function.Function javaUtilFunction; + private boolean isMessageInput; - public JavaInstance(ContextImpl contextImpl, Object userClassObject) { + public JavaInstance(ContextImpl contextImpl, Object userClassObject, boolean isMessageInput) { this.context = contextImpl; @@ -50,14 +52,21 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject) { } else { this.javaUtilFunction = (java.util.function.Function) userClassObject; } + this.isMessageInput = isMessageInput; } - public JavaExecutionResult handleMessage(Record record, Object input) { + public JavaExecutionResult handleMessage(Record record) { if (context != null) { context.setCurrentMessageContext(record); } JavaExecutionResult executionResult = new JavaExecutionResult(); + Object input; try { + if (isMessageInput) { + input = ((PulsarRecord) record).getMessage(); + } else { + input = record.getValue(); + } Object output; if (function != null) { output = function.process(input, context); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index b6c4fc86d69ea..cd5d255374805 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -64,6 +64,7 @@ import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; +import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.utils.Reflections; @@ -166,6 +167,8 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, // metrics collection especially in threaded mode // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down this.collectorRegistry = collectorRegistry; + + } /** @@ -191,6 +194,9 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception { throw new RuntimeException("User class must either be Function or java.util.Function"); } + // check if input is Message + boolean isMessageInput = FunctionCommon.isFunctionInputMessage(object.getClass()); + // start the state table setupStateTable(); // start the output producer @@ -200,7 +206,7 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception { // start any log topic handler setupLogHandler(); - return new JavaInstance(contextImpl, object); + return new JavaInstance(contextImpl, object, isMessageInput); } ContextImpl setupContext() { @@ -254,7 +260,7 @@ public void run() { stats.processTimeStart(); // process the message - result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); + result = javaInstance.handleMessage(currentRecord); // register end time stats.processTimeEnd(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index c03ed9fa834a9..de651c7105ab4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -40,6 +40,7 @@ public class PulsarRecord implements RecordWithEncryptionContext { private final String topicName; private final int partition; + @Getter private final Message message; private final Runnable failFunction; diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 0cb361de67fa8..ee15bbfe1932b 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -36,9 +36,9 @@ public class JavaInstanceTest { public void testLambda() { JavaInstance instance = new JavaInstance( mock(ContextImpl.class), - (Function) (input, context) -> input + "-lambda"); + (Function) (input, context) -> input + "-lambda", false); String testString = "ABC123"; - JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); + JavaExecutionResult result = instance.handleMessage(() -> testString); assertNotNull(result.getResult()); assertEquals(new String(testString + "-lambda"), result.getResult()); instance.close(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 571a614974dad..b9eb11a002dc8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.UUID; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; @@ -98,6 +99,7 @@ public static Class[] getFunctionTypes(Class userClass, boolean isWindowConfi if (isWindowConfigPresent) { if (WindowFunction.class.isAssignableFrom(userClass)) { typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, userClass); + } else { typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass); if (!typeArgs[0].equals(Collection.class)) { @@ -108,17 +110,51 @@ public static Class[] getFunctionTypes(Class userClass, boolean isWindowConfi Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0]; typeArgs[0] = (Class) actualInputType; } + if(typeArgs[0].equals(Message.class)) { + throw new IllegalArgumentException("Window function does no support input as Message"); + } } else { if (Function.class.isAssignableFrom(userClass)) { typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass); + // check if input is of Message.class if so extract nested type + if (typeArgs[0].equals(Message.class)) { + typeArgs[0] = getMessageType(Function.class, userClass); + } } else { typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass); + // check if input is of Message.class if so extract nested type + if (typeArgs[0].equals(Message.class)) { + typeArgs[0] = getMessageType(java.util.function.Function.class, userClass); + } } } return typeArgs; } + public static Class getMessageType(Class functionType, Class userClass) { + ParameterizedType type = (ParameterizedType) TypeResolver.resolveGenericType(functionType, userClass); + ParameterizedType nestedType = (ParameterizedType) type.getActualTypeArguments()[0]; + Type actualType = nestedType.getActualTypeArguments()[0]; + return (Class) actualType; + } + + public static boolean isFunctionInputMessage(Class userClass) { + Class[] typeArgs; + if (Function.class.isAssignableFrom(userClass)) { + typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass); + if (typeArgs[0].equals(Message.class)) { + return true; + } + } else { + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass); + if (typeArgs[0].equals(Message.class)) { + return true; + } + } + return false; + } + public static Object createInstance(String userClassName, ClassLoader classLoader) { Class theCls; try { From b3944225f8b6f4fce14c8aa9ac056cee9c837f7f Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Sun, 19 May 2019 23:41:22 -0700 Subject: [PATCH 2/4] adding example --- .../api/examples/MessageInputFunction.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputFunction.java diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputFunction.java new file mode 100644 index 0000000000000..1f62dded89386 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputFunction.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + + +public class MessageInputFunction implements Function, String> { + + @Override + public String process(Message input, Context context) throws Exception { + return input.getKey() + "-" + input.getValue(); + } +} From a85950e925528bd7f79f8db7cf726b500411a057 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 20 May 2019 15:34:38 -0700 Subject: [PATCH 3/4] add tests --- .../worker/PulsarFunctionE2ESecurityTest.java | 2 +- .../pulsar/io/PulsarFunctionE2ETest.java | 151 ++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 775a4bfa5a316..0b293f24f386f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -103,7 +103,7 @@ public class PulsarFunctionE2ESecurityTest { private static final String ADMIN_SUBJECT = "superUser"; private static final String ANONYMOUS_ROLE = "anonymousUser"; - private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); + private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ESecurityTest.class); private String adminToken; private String brokerServiceUrl; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 40b1783e6db7c..922cc684bc8dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -1565,6 +1565,157 @@ public void testFunctionAutomaticSubCleanup() throws Exception { assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); } + @Test + public void testMessageAsInputJavaNativeFunction() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarFunction-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe(); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespacePortion); + functionConfig.setName(functionName); + functionConfig.setParallelism(1); + functionConfig.setInputs(Collections.singleton(sourceTopic)); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.MessageInputJavaNativeFunction"); + functionConfig.setOutput(sinkTopic); + functionConfig.setCleanupSubscription(false); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setSubName(subscriptionName); + + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription(); + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription()); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar source consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "val_" + i; + String key = "key_" + i; + producer.newMessage().key(key).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + for (int i = 0; i < totalMsgs; i++) { + String data = "val_" + i; + String key = "key_" + i; + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertEquals(key + "-" + data, msg.getValue()); + assertEquals(key, msg.getKey()); + } + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + } + + @Test + public void testMessageAsInputFunction() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarFunction-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe(); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespacePortion); + functionConfig.setName(functionName); + functionConfig.setParallelism(1); + functionConfig.setInputs(Collections.singleton(sourceTopic)); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.MessageInputFunction"); + functionConfig.setOutput(sinkTopic); + functionConfig.setCleanupSubscription(false); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setSubName(subscriptionName); + + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription(); + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription()); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar source consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "val_" + i; + String key = "key_" + i; + producer.newMessage().key(key).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + for (int i = 0; i < totalMsgs; i++) { + String data = "val_" + i; + String key = "key_" + i; + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertEquals(key + "-" + data, msg.getValue()); + assertEquals(key, msg.getKey()); + } + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + } public static String getPrometheusMetrics(int metricsPort) throws IOException { StringBuilder result = new StringBuilder(); From 7607e6f7db510721250b2d39aa44c23f42dc867d Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 20 May 2019 18:16:27 -0700 Subject: [PATCH 4/4] add example function --- .../MessageInputJavaNativeFunction.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputJavaNativeFunction.java diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputJavaNativeFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputJavaNativeFunction.java new file mode 100644 index 0000000000000..47afe70f3536a --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MessageInputJavaNativeFunction.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.client.api.Message; + +public class MessageInputJavaNativeFunction implements java.util.function.Function, String> { + @Override + public String apply(Message input) { + return input.getKey() + "-" + input.getValue(); + } +}