Skip to content

Conversation

@jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented May 20, 2019

Motivation

Support Message object as function input so that users can get all of the attributes of the Message.

Implementing this for windowed function is a more complicated and will discuss/implement after this

If everyone is ok with this approach then I will add tests

Discussions about this was done in:

#4127

@jerrypeng
Copy link
Contributor Author

@ConcurrencyPractitioner please also take a look

import org.apache.pulsar.functions.api.Function;


public class MessageInputFunction implements Function<Message<String>, String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an integration test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie as mentioned in the description of the PR, once everyone is on board with this approach I will add the tests for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie I have added the tests

Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
typeArgs[0] = (Class<?>) actualInputType;
}
if(typeArgs[0].equals(Message.class)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the plan for adding this support to window function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will probably have to change the interface from

public interface WindowFunction<I, O> {
    O process(Collection<Record<I>> input, WindowContext context) throws Exception;
}

to

public interface WindowFunction<I, O> {
    O process(Collection<I> input, WindowContext context) throws Exception;
}

And support "I" as Record for BC and support "I" as Message.

@ConcurrencyPractitioner
Copy link
Contributor

Sure, I have no problem with this. Thanks for picking this up. :)

@jerrypeng
Copy link
Contributor Author

rerun cpp tests

@jerrypeng
Copy link
Contributor Author

@srkukarni @sijie can you look at this again? Thanks

@jerrypeng
Copy link
Contributor Author

jerrypeng commented May 21, 2019

Alternatively, instead of allowing user's to be able to have Message as input to a function, a simpler approach of we would be just to add this API to Context:

 Message getInternalMessage();

and the Impl would be:

Message getInternalMessage() {
return ((PulsarRecord) this.record).getMessage);
}

There also isn't any good way (without breaking BC) for the WindowFunction API to support Message as a input since the WindowFunction API is

public interface WindowFunction<I, O> {
    /**
     * Process the input.
     *
     * @return the output
     */
    O process(Collection<Record<I>> input, WindowContext context) throws Exception;
}

I don't think it makes much sense to do

    O process(Collection<Record<Message<I>>> input, WindowContext context) throws Exception;

We would have to change the interface (BC breaking) to:


public interface WindowFunction<I, O> {
    /**
     * Process the input.
     *
     * @return the output
     */
    O process(Collection<I> input, WindowContext context) throws Exception;
}

To support Message as a input

Alternatively, we can change the API for WindowFunction to be something like

    O process(Window<I> input, WindowContext context) throws Exception;

and you can do

input.getAsMessages()

or 

input.getAsRecords()

or in the WindowContext we implement a method like

context.getWindowAsMessages()

@jerrypeng
Copy link
Contributor Author

The simplest and non-BC-breaking approach I can think of for Functions and WindowFunctions:

just to add these getter methods to Context and WindowContext:

Message getInternalMessage();

and

List<Message> getWindowAsMessages()

@merlimat
Copy link
Contributor

@jerrypeng What about the message type? I believe Context is not generic at this point. Would you get a Message<?> then?

@jerrypeng
Copy link
Contributor Author

@merlimat that is correct which is a con for that approach.

@jerrypeng
Copy link
Contributor Author

@sijie @srkukarni @merlimat what we can do is this:
#4341

simpler approach that solves all our problems

@srkukarni
Copy link
Contributor

+1 for #4341

@jerrypeng jerrypeng closed this May 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants