Skip to content

Conversation

@wolfstudy
Copy link
Member

@wolfstudy wolfstudy commented Apr 20, 2019

Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com

Master Issue: #4042

Fixes #4042

Motivation

improve java functions API, when you need to publish the fields in the TypedMessageBuilder, there is no need to add a new publish method, just modify the interface in the TypedMessageBuilder.

@wolfstudy
Copy link
Member Author

@sijie @jerrypeng PTAL, and i have a question, #4009 and #4042 is it the same problem?

@sijie
Copy link
Member

sijie commented Apr 20, 2019

@wolfstudy thank you for picking this up! #4009 and #4042 are addressing the same problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should change publish methods to use newOutputMessage. so that newOutputMessage can be covered by existing tests and you don't need to write new tests.

TypedMessageBuilder<O> messageBuilder = newOutputMessage(topicName, schema);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea, will fix it

@sijie sijie added area/function type/feature The PR added a new feature or issue requested a new feature labels Apr 20, 2019
@sijie sijie added this to the 2.4.0 milestone Apr 20, 2019
Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy thanks for working on this!

Though I don't think you are taking the right approach to this problem. The goal of this task to add/simplify APIs that allows users to

  1. Set all message properties (except value) for an output message i.e. output returned by the function
  2. Set all message properties (except value) for message published using context.publish

We need some like a MessageConfBuilder that users can set the message properties they want and pass it to:

context.publish(output, topic, messageConf/messageConfBuilder)

for publish messages using context and:

context.setOutputMessageConf(messageConf/messageConfBuilder)

for setting that for the output message i.e. output returned by the function.

The usage would something like this:

MessageConf messageConf = MessageConf.Builder().key("key").properties(myProperties).build()

context.publish("output", "my-topic", messageConf)

# or for output message

context.setOutputMessageConf(messageConf)

@wolfstudy
Copy link
Member Author

wolfstudy commented Apr 20, 2019

The usage would something like this:

MessageConf messageConf = MessageConf.Builder().key("key").properties(myProperties).build()

context.publish("output", "my-topic", messageConf)

# or for output message

context.setOutputMessageConf(messageConf)

thanks @jerrypeng , my think is that we provide the user with a method, es: newOutputMessage() that allows the user to get the TypedMessageBuilder, so that the user can set the configuration they want and then sendAsync() it out.

The usage would something like this:

TypedMessageBuilder messageBuilder = newOutputMessage("output", Schema.JSON());

messageBuilder.value("my-topic").key("key").properties("myproperties").sendAsync();

Do you think this way can solve the problems we have encountered?

@sijie
Copy link
Member

sijie commented Apr 20, 2019

Jerry, I don’t think we should reinvent a new API. We should just use message builder hence whatever is available in pulsar client is available to the function users. E.g. user can configure key, sequence id and event time as they need.

@sijie
Copy link
Member

sijie commented Apr 20, 2019

@wolfstudy after looking a bit more into #4009, it is similar but it is a different topic. The request is for attaching properties to the result returned by a function. In order to achieve that, it requires your change here. I would suggest leaving #4009 to handle in a subsequent PR.

@wolfstudy
Copy link
Member Author

@wolfstudy after looking a bit more into #4009, it is similar but it is a different topic. The request is for attaching properties to the result returned by a function. In order to achieve that, it requires your change here. I would suggest leaving #4009 to handle in a subsequent PR.

ok, will remove #4009

@jerrypeng
Copy link
Contributor

@sijie I strongly disagree. Allowing users to the send out messages via

messageBuilder.value("my-topic").key("key").properties("myproperties").sendAsync();

breaks the abstraction of functions. In this case, the user might as well just run a custom java application. By using a publish message in the context, we can also collect metrics for the user on number of messages outputted and errors on publish. Allowing users to do this side steps a lot of mechanisms and workflows are ready in place in functions

@sijie
Copy link
Member

sijie commented Apr 20, 2019

It doesn’t break the abstraction. The context.publish was designed for function users publishing messages inside a function. There is no difference between you are using context publish, or a message builder. The current publish method introduces a lot of overrided methods which make context interface messy. We can’t keep adding new overridden methods when a new feature comes in.

Returning a message builder doesn’t mean you are bypassing function frameworks. You can return a builder implementation which wraps the builder returned by the client. So when sendAsync was called metrics are updated correctly.

I strongly believe message builder is the right approach. I would also strongly disagree adding any other customized builder or any overridden method which makes function api become messy.

@jerrypeng
Copy link
Contributor

@sijie I don't want to yet introduce another way for users to publish messages. We already have the publish method so lets just keep it. We can condense some of the parameters of the publish method into this message conf builder. We can also use the same builder and pass it to set for output messages if the users would rather return a output from the function. We should have uniform way to specify the message configurations for both messages user want to publish and outputs returned by the function.

@jerrypeng
Copy link
Contributor

Besides for this "TypedMessageBuilder messageBuilder = newOutputMessage("output", Schema.JSON());" we would need to wrap sendAsync somehow so that functions framework can execute code on the callback to collect stats and error handling which can make things more messy and complicated

@sijie
Copy link
Member

sijie commented Apr 20, 2019

I strongly believe newOutputMessage is the right approach for deprecating context.publish. All context.publish methods should be deprecated and they should be changed to use the newOutputMessage.

Jerry, if you check the code, the current context.publish really does nothing than just wrapping the newMessageBuilder. Move the message builder to the interface is actually making a lot of things clearer and reduce the complexity on maintaining various overridden methods.

TypedMessageBuilder messageBuilder = producer.newMessage();
if (messageConf != null) {
messageBuilder.loadConf(messageConf);
}
CompletableFuture future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
future.exceptionally(e -> {
this.statsManager.incrSysExceptions(e);
logger.error("Failed to publish to topic {} with error {}", topicName, e);
return null;
});
return future;

@sijie
Copy link
Member

sijie commented Apr 20, 2019

The only function code involve in current context.publish is “this.statsManager.incrSysExceptions(e);”.

@jerrypeng
Copy link
Contributor

jerrypeng commented Apr 20, 2019

The only function code involve in current context.publish is “this.statsManager.incrSysExceptions(e);”.

Yes but we need this.

@sijie what do you also propose on how to set these message configs for outputs returned by the function?

That is why I proposed some like the following because then it will be uniform:

MessageConf messageConf = MessageConf.Builder().key("key").properties(myProperties).build()

context.publish("output", "my-topic", messageConf)

# or for output message

context.setOutputMessageConf(messageConf)

@sijie
Copy link
Member

sijie commented Apr 20, 2019

Yes but we need this.

As I said, it can be add to the wrapper of sendAsync.

For handling the output result, I have mentioned following approach in our DM - User can get a message builder through “context.newOutputMessage()” and write a function to return the builder: MessageBuilder<O> process(I input, Context context);. Hence the way how people configure the message for result and “context.publish” will be same.

I don’t like the propose of “context.setOutputMessageConf”. It is very error-prone. If people didn’t pay attention or then implementation doesn’t handle correctly, the message conf is going to apply to every message, but that’s not the case. Using newOutputMesssage would allow a clear way for configuring message properties per message.

@jerrypeng
Copy link
Contributor

jerrypeng commented Apr 20, 2019

@sijie adding another support for MessageBuilder<O> process(I input, Context context); will likely involve adding another function interface that users can implement and adding support for it in the functions framework . This will be a bigger change and perhaps require broader discussion.

I am not sure adding another function interface is the correct way to go about this. My concerns about this:

  1. User's will have to implement another interface to take advantage of being able to set these message confs for returned output. If a user has already written a function using our current function interface, he or she will have to change it. This forces users to make a considerable amount of changes instead of just calling an additional method.

  2. Additional interface we will have to support in the function runtime. While this is not a deal breaker for me, but I rather not support any more interfaces than necessary. In my opinion, only when we absolutely cannot use the existing function interface to accomplish something should we consider this because this can additional complexity to the function runtime.

  3. My vision for functions is that we keep a certain distance from a producer and consumer API to distinguish functions a way to do lightweight compute and not just some generic consume/produce application. So I would rather not have any more APIs that resemble consumer/producer semantics. Again this is just a preference.

It is very error-prone. If people didn’t pay attention or then implementation doesn’t handle correctly, the message conf is going to apply to every message

I understand your concern for this


if (foo) {
messageBuilder.setKey(foo)
}

if (bar) {
messageBuilder.setProperties(bar)
}

mesageBuilder.setValue(val)

return messageBuilder;

is that really less error prone than:

if (foo) {
messageBuilderConf.setKey(foo)
}

if (bar) {
messageBuilderConf.setProperties(bar)
}

context.setOutputMessageConf(messageBuilderConf);

return val;

@sijie
Copy link
Member

sijie commented Apr 21, 2019

is that really less error prone than:

The context is designed for immutability. Hence the function implementor should not change any kind of state of context. At the time that you introduce context.setOutputMessageConf(messageBuilderConf), you are turning context from immutable to mutable. The problem arises, because the context is shared by all the context.publish calls. Hence you have to take care of the sequence setOutputMessageConf and calling context.publish, if you don't take care of the sequence, race conditions can arise and unpredictability also arise. A lot of unknown bugs will be introduced. And that's what I called error prone.

In the approach I proposed, context.newOutputMessageBuilder() returns a brand new builder for each calls. The function implementation can attach different properties to different builders and they are isolated. you don't need to worry about the sequence of setting properties between two publish calls.

Just think about the following implementation.

MessageBuilder<T> msgBuilder1 = context.newOutputMessage("topic");
msgBuilder1.key("key1");
MessageBuilder<T> msgBuilder2 = context.newOutputMessage("topic");
msgBuilder2.key("key2");
MessageBuilder<T> msgBuilder3 = context.newOutputMessage("topic");

msgBuilder3.value("value3");
msgBuilder2.value("value2");
msgBuilder1.value(“value1");

msgBuilder1.sendAsync();
msgBuilder2.sendAsync();
msgBuilder3.sendAsync();

adding another support for MessageBuilder process(I input, Context context); will likely involve adding another function interface that users can implement and adding support for it in the functions framework . This will be a bigger change and perhaps require broader discussion.

@jerrypeng I am not proposing adding a new interface. it is still the same function interface. but the generic type of the function is MessageBuilder<T>.

public class MessageBuilderFunction<I, O> implements Function<I, TypedMessageBuilder<O>> {
    @Override
    public TypedMessageBuilder<O> process(I input, Context context) throws Exception {

        TypedMessageBuilder<O> msgBuilder = context.newOutputMessage();
        
        msgBuilder.value(process(input));
        msgBuilder.key(...);

        return msgBuilder;
    }
}

The only change is in PulsarSink#write(Record record), check if record.getValue is a TypedMessageBuilder, if so use the builder. It is a simple and straightforward change without changing any interfaces.

Regarding your concerns about my proposal, I believe it comes from your misunderstanding on my proposal. but anyway I am replying one by one as below.

User's will have to implement another interface to take advantage of being able to set these message confs for returned output. If a user has already written a function using our current function interface, he or she will have to change it. This forces users to make a considerable amount of changes instead of just calling an additional method.

If users already have functions, they probably don't need to attach properties to the output message. If users really want to change old functions to attach propertites, what he needs to do is change the generic type from O to TypedMessageBuilder<O>, and change return O; to return context.newOutputMessage().value(O);. I don't see how this can be a considerable amount changes than calling additional method.

Additional interface we will have to support in the function runtime. While this is not a deal breaker for me, but I rather not support any more interfaces than necessary.

This is not a new interface. As I described above, this only requires a few lines change in PulsarSink. I don't see why this is a breaker.

My vision for functions is that we keep a certain distance from a producer and consumer API to distinguish functions a way to do lightweight compute and not just some generic consume/produce application. So I would rather not have any more APIs that resemble consumer/producer semantics.

why it is called lightweight computing? It is because users don't have to worry about error handling, exactly-once, code distribution, failover and many things that they have to worry about using a pure producer or consumer. but it doesn't mean they shouldn't use the well-defined pulsar api around message builder.

Pulsar functions is part of Pulsar. we should use whatever is already defined as an API in pulsar, rather than spinning off any kind of new interfaces.

Hence I don't see how newOutputMessage() would introduce any complexity into functions. It is actually much simpler, more flexible and easier to maintain than current messy context.publish interfaces.

@jerrypeng
Copy link
Contributor

@sijie I am ok with what you are proposing

@wolfstudy
Copy link
Member Author

thanks @sijie @jerrypeng , study a lot.

PTAL again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy Can you add javadoc comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add @Deprecated in favor of using {@link #newOutputMessage} to all the context publish methods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as Jerry and me discussed in thread, you need to add a simple wrapper to update metrics correctly.

    public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) {
        final TypedMessageBuilder<O> underlyingBuilder = ...;
        return new TypedMessageBuilder<O>() {

            @Override
            public MessageId send() throws PulsarClientException {
                return sendAsync().get();
            }

            @Override
            public CompletableFuture<MessageId> sendAsync() {
                return underlyingBuilder.sendAsync()
                    .whenComplete((result, cause) -> {
                        if (null != cause) {
                            statsManager.incrSysExceptions(cause);
                            logger.error("Failed to publish to topic {} with error {}", topicName, e);
                        }
                    });
            }

            @Override
            public TypedMessageBuilder<O> key(String key) {
                underlyingBuilder.key(key);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> keyBytes(byte[] key) {
                underlyingBuilder.keyBytes(key);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> value(O value) {
                underlyingBuilder.value(value);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> property(String name, String value) {
                underlyingBuilder.property(name, value);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> properties(Map<String, String> properties) {
                underlyingBuilder.properties(properties);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> eventTime(long timestamp) {
                underlyingBuilder.eventTime(timestamp);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> sequenceId(long sequenceId) {
                underlyingBuilder.sequenceId(sequenceId);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> replicationClusters(List<String> clusters) {
                underlyingBuilder.replicationClusters(clusters);
                return this;
            }

            @Override
            public TypedMessageBuilder<O> disableReplication() {
                underlyingBuilder.disableReplication();
                return this;
            }

            @Override
            public TypedMessageBuilder<O> loadConf(Map<String, Object> config) {
                underlyingBuilder.loadConf(config);
                return this;
            }
        };
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change all the publish implementation to use newOutputMessage

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic should be moved to be handled in the message builder wrapper as described above.

@wolfstudy
Copy link
Member Author

ok, will fix it

@wolfstudy
Copy link
Member Author

run java8 tests
run cpp tests
run integration tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was added very recently. I believe it is not included in 2.3.x release. If so we should delete this interface as opposed to deprecating it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng @sijie i am ok, what do you think about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, @srkukarni ! We can remove this method, since it was introduced in #4005 which has not been released yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup we can remove that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will remove that

@wolfstudy
Copy link
Member Author

@jerrypeng PTAL again, thanks

<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie @wolfstudy the concern comes back to whether the functions api should depend of the pulsar-client-api

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng let's stay in #4127 for the discussion

@wolfstudy
Copy link
Member Author

@jerrypeng @srkukarni can you review this again? thanks

Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just two minor issues

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

run java8 tests

1 similar comment
@jerrypeng
Copy link
Contributor

run java8 tests

Copy link
Contributor

@srkukarni srkukarni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest problem for me is the coupling of client-api with functions-api.
Can you please elaborate what will happen when we have different version of the client side api and a different server side api. For example what happens when we release a new feature in typed builder that a function writer picks up but submits to a cluster running older version of pulsar that doesnt have this change.

*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.client.api.PulsarClientException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename this file to saying typedMessageBuilderPublish

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, cool suggestion, thanks @srkukarni will fix it

Copy link
Contributor

@srkukarni srkukarni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further thought, I think this is a dep with api and not impl, thus I'm ok with this change. just one last nit wrt filename and we are good to go

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

thanks @srkukarni review this , already fixed it, PTAL again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "t" needs to be capitalized. That is the convention for naming classes in java

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy wolfstudy force-pushed the xiaolong/localrun branch from aa90cdb to 9c83892 Compare May 3, 2019 18:39
@wolfstudy
Copy link
Member Author

run java8 tests
run integration tests

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy wolfstudy force-pushed the xiaolong/localrun branch from 8d7ab69 to 6280e56 Compare May 4, 2019 05:20
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy wolfstudy force-pushed the xiaolong/localrun branch from 9aa8c06 to 3c2e8b2 Compare May 4, 2019 05:24
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy wolfstudy force-pushed the xiaolong/localrun branch from d98efa5 to 80d3ae4 Compare May 4, 2019 07:11
@wolfstudy
Copy link
Member Author

run java8 tests

1 similar comment
@wolfstudy
Copy link
Member Author

run java8 tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/function type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

For Java Functions API: Improve how message properties can be set for output messages and context.published messages

4 participants