Skip to content

Conversation

@ConcurrencyPractitioner
Copy link
Contributor

@ConcurrencyPractitioner ConcurrencyPractitioner commented Apr 25, 2019

Resolves Issue #4110

Motivation

The message metadata that Pulsar Function uses is unavailable to the user. Consequently, they could not use this metadata for their own computations. We wish to expose this metadata.

Modifications

After discussion in the issue, it has been agreed that adding a new getActualMessage() method to PulsarRecord will help fix this problem.

@ConcurrencyPractitioner
Copy link
Contributor Author

ping @jerrypeng @sijie

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I miss anything in the PR? I only see you changed PulsarRecord. How can people use this interface? Also can you add an example function on how people can use this method?

@ConcurrencyPractitioner
Copy link
Contributor Author

@sijie Oh, well, wasn't this what was proposed in the issue?

@ConcurrencyPractitioner
Copy link
Contributor Author

Alright, @sijie I added a getCurrentMessage() method (I think in line with your approach 2).

@ConcurrencyPractitioner
Copy link
Contributor Author

Retest this please.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner

Neither ContextImpl.java nor PulsarRecord.java is a public interface class. How can people use this method? Don't you need to add this method to some interfaces?

The reason I asked you to provide a Pulsar Function example is to show the developers how to use this method.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Apr 27, 2019

@sijie. Oh, didn't notice that we were in internals at first. In that case, we would just have to add getCurrentMessage() to the Context interface then. Since that interface is technically a public API, it should be accessible.


@Override
public Object getCurrentMessage() {
return ((PulsarRecord<?>) record).getActualMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should just be record.getActualMessage?

Copy link
Contributor Author

@ConcurrencyPractitioner ConcurrencyPractitioner Apr 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, well, it was getActualMessage() for PulsarRecord, but in ContextImpl, I think getCurrentMessage() was proposed to be used instead. I just took the name from what was proposed in the issue.

@ConcurrencyPractitioner
Copy link
Contributor Author

The user I think should be able to retrieve the current message as needed. (which should help resolve the problem which the issue initially posed, i.e. message was unavailable to the user for computation).

/**
* Access the message associated with current input value.
*/
Object getCurrentMessage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner

what @jerrypeng proposed in #4110 "we could also add a getActualMessage() method to PulsarRecord" is the right direction. I guess what he means there is "add getAcutalMessage() to Record". because PulsarRecord is an internal class which is not publicly available to the developers who develop a function.

I asked the question "can you add an example function on how people can use this method?", is to help you think from a function developer perspective. You have to be a pulsar function user before you know how to make a good API to pulsar functions users. If you write a pulsar function example to use the method you proposed, you will know whether the method is good or not.

You can find the examples under https://github.com/apache/pulsar/tree/master/pulsar-functions/java-examples, which demonstrate how users can use the function API to develop functions. My suggestion to you is to write a pulsar function example first. It would help you a lot on understanding why we need this change and how to provide the right api to the developers.

Now back to the discussion on the interface itself.

  1. I don't think we should add another getCurrent* interface in Context. It makes the interface very confused because there is already a getCurrentRecord. Hence it should be getActualMessage in Context as what @jerrypeng proposed.

  2. Returning Object doesn't help resolving the problem. The function-api dependency doesn't include pulsar-client-api, so developers don't know what type is this object and they can not cast it back to Message<T>.

That means in order to implement the proposal that @jerrypeng proposed in #4110 , you have to do followings:

  • add Message<T> getActualMessage() to Record<T> interface.
  • introduce pulsar-client-api dependency to pulsar-function-api.

If @srkukarni and @jerrypeng agree on introducing pulsar-client-api dependency to pulsar-function-api, then you can implement that as what I point out, and #4110 is done.

However if there are concerns about the pulsar-client-api dependency in pulsar-function-api, then we have to go back and check my original proposal to add the support of using Message<T> as input type. Hence we don't need to include pulsar-client-api as pulsar-function-api, and it also supports people writing functions using java native Function interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah. I was stupid for not understanding what was required. Sorry about that. I took a look at PulsarFunctions examples and it helped. Added a use case in examples.

@ConcurrencyPractitioner
Copy link
Contributor Author

Retest this please.

@sijie
Copy link
Member

sijie commented Apr 28, 2019

@ConcurrencyPractitioner @srkukarni

if pulsar-client-api is not a dependency of pulsar-function-api, how can a function user cast it to Message?

If you are asking people to cast to Message<T>, why not adopt my original proposal in #4110 which is way cleaner that returning Object and asking users to cast Message<T>.

@ConcurrencyPractitioner
Copy link
Contributor Author

Oh, I'm fine with a change in approach. Any thoughts @srkukarni?

@ConcurrencyPractitioner ConcurrencyPractitioner changed the title [Issue#4110] [component/functions] Adding getActualMessage() method [Issue#4110] [component/functions] Adding message as source or input of Function Apr 28, 2019
@jerrypeng
Copy link
Contributor

jerrypeng commented Apr 29, 2019

@ConcurrencyPractitioner @sijie @srkukarni

Approaches I thought of:

  1. If we don't want to pull in pulsar-client-api and we want to the users to be able to get all of the properties of a Message, then we will have to add the corresponding methods to the Record interface to expos all the properties.

Pros: pulsar-function-api do not need to depend on pulsar-client-api

Cons: As @sijie mentioned before, we need properties/methods are add to Message, we will also need to add them to the Record interface. However, my biggest concern with this approach is that we already have methods like getRecordSequence in the Record interface (which is different then the message sequence id though they return different forms of the same thing). We will have to add another method getSequenceId to the Record interface which will be confusing. Thus, I am not for this method

  1. We add an additional method to PulsarRecord i.e. public Message<T> getRawMessage(). Add an method to the Context interface i.e. pubic Message<T> getRawInputMessage. In the implementation of the method, we will just cast Record to PulsarRecord which is assumed for functions anyways.

Pros: Simple addition

Cons: pulsar-function-api need to depend on pulsar-client-api. Though this is not a deal breaker for me.

@sijie
Copy link
Member

sijie commented Apr 29, 2019

@jerrypeng have you checked my initial proposal? the approaches you mentioned are not the proposal I proposed initially in #4110

@jerrypeng
Copy link
Contributor

@sijie want is your point?

@sijie
Copy link
Member

sijie commented Apr 29, 2019

my original proposal provides a cleaner approach. we don't have to deal with the pulsar-client-api and pulsar-function-api dependency and people doesn't have to cast to an PulsarRecord to get the actual message. Things are handled properly and gracefully.

@jerrypeng
Copy link
Contributor

@sijie first of all I am not proposing user's casting Record -> PulsarRecord. We can do that internally in the implementation and just return to the user the Message interface.

@sijie the approach you suggest will still require the user to pull in the pulsar-client-api.

@jerrypeng
Copy link
Contributor

@sijie I am also ok will your approach if everyone one else is onboard

@sijie
Copy link
Member

sijie commented Apr 29, 2019

the approach you suggest will still require the user to pull in the pulsar-client-api.

@jerrypeng

it is different from adding a dependency to pulsar-function-api though. In my approach, pulsar-client-api will be treated as "user function dependency".

@srkukarni
Copy link
Contributor

My biggest concern is about adding pulsar-client dep to functions interface. My inclination would be to keep the interfaces seperate as much as possible.
@sijie just thinking aloud here:- there might be specialization interfaces that might have closer pulsar integration. Just like windowing, we could have your MessageFunction api, but keep it at a user layer?

@sijie
Copy link
Member

sijie commented Apr 29, 2019

Sanjeev

Just like windowing, we could have your MessageFunction api, but keep it at a user layer?

My original proposal doesn’t change function api at all. It is a runtime change to support user behavior.

@jerrypeng
Copy link
Contributor

@sijie @srkukarni though for this PR #4093

It's already trying to add pulsar-client-api as a dependency of pulsar-functions-api

@jerrypeng
Copy link
Contributor

If that PR is going in then or discussion about whether to add pulsar-client-api as a dependency of pulsar-functions-api becomes irrelevant.

I guess we should discuss whether or not adding pulsar-client-api as a dependency of the functions api is appropriate or not.

Ideally we should try keep these separate so we don't create more a of mess of dependencies.

What are (if any) the potential dependency related problems we would see if this happens?

@sijie
Copy link
Member

sijie commented Apr 30, 2019

If we are going to support publishing Messages in function or retrieving a Message instance (for accessing the full list of metadata associated with a message), we have to include pulsar-client-api as an dependency of pulsar-function-api. So my vote here will just add the dependency for both #4127 and #4042 .

For 4127, I still don't think casting is a good approach. If a record can be a non-pulsar-message record (because we used same abstraction for source connectors), my vote here will be providing the support of using Message<T> as the generic type for Function Input Type, hence users don't have to cast.

@jerrypeng
Copy link
Contributor

@sijie I am ok with adding pulsar-client-api as a dependency of the function-api. I can't think of a problem that may cause.

@jerrypeng
Copy link
Contributor

However, now we will have multiple ways to get the same data. One from Message and the other from context.getRecord. This is not an ideal situation.

What about Python runtime ? There is no way for a user to specify a Message type there. If we were to support this for python, it will most likely need to be exposed with context. I would like to see some uniformity in how we do this across languages though.

@sijie
Copy link
Member

sijie commented Apr 30, 2019

If we were to support this for python, it will most likely need to be exposed with context. I would like to see some uniformity in how we do this across languages though.

It is some not all. Most of the features available in Java Client / Function is specific to Java, because Java supports Generic Type. Most of other languages like Python and Go don't support Generic Type. The approach I am proposing is using Generic Type, which is specific to Java.

The concern I have with exposing Message in Java Record is casting. Because the Java function code is also used for pulsar-io (source and sink). But there are not source and sinks in Python or Go.

Hence IMO it is really hard to achieve uniformity across the languages from many aspects.

Also, the original motivation of functions is to let user write the function in a native way as how they write a normal function in their preferred language. If that still stands, we should just consider the most native approach for each language rather than choosing the uniformity. If you look into severless world, you will find that it is really hard to get the uniformity across languages.

@jerrypeng
Copy link
Contributor

@ConcurrencyPractitioner @sijie @srkukarni lets continue progress in this as there are users waiting for this feature. I am for with allowing functions to has Message<T> as an input as @sijie suggested. Is everyone ok with that?

@sijie
Copy link
Member

sijie commented May 14, 2019

I am for with allowing functions to has Message as an input as @sijie suggested. Is everyone ok with that?

+1 from me

@ConcurrencyPractitioner
Copy link
Contributor Author

Cool, I'm fine with implementing that. @srkukarni I guess if you don't have any problems with this, then we could go ahead and get started.

@ConcurrencyPractitioner
Copy link
Contributor Author

Oh, then with this approach, what are the high level steps in adding Message as an acceptable input format?

@srkukarni
Copy link
Contributor

Since we have already added client-api dep on functions-api, i will withdraw my concerns about dep. so please go ahead. One thing that also needs to be done is allow Message in windowing api as well.

@ConcurrencyPractitioner
Copy link
Contributor Author

Alright, so I did some digging, and I have some ideas on how to do it. But I just want to make sure that my thoughts on the implementation is right. @srkukarni @sijie or @jerrypeng Do you mind explaining the high level steps involved in coding this approach? It doesn't have to be long, maybe just some pointers.

@jerrypeng
Copy link
Contributor

@sijie @srkukarni if we go with this approach and a user has something like the following:

public class MyFunction implements Function<Message<InputType>, OutputType> {

public void process(Message<InputType>, Context context) {
....
}

}

what should go into the Protobuf field in the SourceSpec typeClassName? Message.class? But then we don't store the actual type of the input.

@srkukarni
Copy link
Contributor

@jerrypeng in the protobuf, we should store inputtype. The rest of the logic handling should be inside the code.

@jerrypeng
Copy link
Contributor

@sijie @srkukarni @ConcurrencyPractitioner I think we can close this?

I think everyone is in favor of just doing this #4341 instead

@sijie
Copy link
Member

sijie commented Jun 20, 2019

close this issue since #4341 has already implemented it

@sijie sijie closed this Jun 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants