-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Set key for message when using function publish #4005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we want to always do it this way. For example, what if we have multiple input topics with different keys, etc.
Would it be better to explicitly have an extra parameter?(Both for java and py)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be the default behavior if the user doesn't explicitly specify the key since that is the behavior for objects returned from the function. We can add an additional method in which the user can explicitly specify a key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a python expert. but this PR doesn't seem to specify partition_key in the method signature. how does that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie I have updated the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a ut or integration test for this? this raises a huge concern to me - we update the implementation but didn't change the interface definition. how can we guarantee the fix is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie you reviewed the PR when I was in the middle of updating the PR due to discussions and comments around this PR. The original PR did not modify any of the interfaces, but after a discussion with @srkukarni and @merlimat, we decided it was best if we let the user choose what the partition key is. That said, I will add some tests to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually @sijie if you look at the commit you commented on. The signature of contextimpl.publish didn't change. Thus context.py did not need to be updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I might be commenting in between commits. but my question is still there : how can we guarantee the fix is correct? where are the tests for this change?
|
@sijie in my previous comment, I already said I was going to add the tests.
Please read my responses in their entirety before responding.
…On Mon, Apr 8, 2019 at 8:07 PM Sijie Guo ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pulsar-client-cpp/python/pulsar/functions/context.py
<#4005 (comment)>:
> @@ -126,7 +126,8 @@ def record_metric(self, metric_name, metric_value):
@AbstractMethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
Sure I might be commenting in between commits. but my question is still
there : how can we guarantee the fix is correct? where are the tests for
this change?
—
You are receiving this because you were assigned.
Reply to this email directly, view it on GitHub
<#4005 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ADcir6hlv1dcxwEj9LyuwhTw3xsfOlI-ks5vfAPtgaJpZM4ci9Wj>
.
|
|
@jerrypeng got it now. thank you |
…t_publish_set_key
6cedfa5 to
ba9b884
Compare
|
@merlimat @jerrypeng @sijie please review this PR again. I have rewritten this PR to use the change @merlimat made for MessageBuilder. I added also included several tests |
merlimat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
srkukarni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to add an integration test to cover this feature?
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng overall looks good to me. left a few comments there. one of the main questions is around the partition_key you added to the output message. I am not sure if that's correct, unless I am missing some context there.
| pass | ||
|
|
||
| @abstractmethod | ||
| def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any reason why do we have two methods here? doesn't the named argument message_conf=None is good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie are you talking about
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
and
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
The former is for backwards compatibility purposes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in python you don't need two methods for backwards compatibility though. one method with named argument is good enough
| * | ||
| * @return A future that completes when the framework is done publishing the message | ||
| */ | ||
| <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it better to expose publish(String topicName, O object, Schema<T> schema, Map<String, Object> messageConf)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do some caching with schemaOrSerdeClassName so that we don't instantiate the schema/SerDe every single type. And we use schemaOrSerdeClassName as the key for the cache. Not greatest user experience but that is what is happening now
| if output_bytes is not None: | ||
| props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())} | ||
| self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message, self.producer.topic()), properties=props) | ||
| self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message, self.producer.topic()), properties=props, partition_key=msg.message.partition_key()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are using the partition_key from the input message as the partition_key for the output message. it doesn't seem to be correct to me.
any reason why you are setting this key here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with Java, but I will remove this for now
| if (context.getCurrentRecord().getKey().isPresent()) { | ||
| messageConf.put("key", context.getCurrentRecord().getKey().get()); | ||
| } | ||
| messageConf.put("eventTime", System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use the constants defined in TypedMessageBuilder.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change
|
@sijie thanks for the review. I have updated the PR. |
|
rerun java8 tests |
|
rerun java8 tests |
|
rerun java8 tests |
|
rerun integration tests |
|
rerun java8 tests |
### Motivation Pulsar release 2.4.0 supports setting partition key for messages when using function publish, as described in this PR: #4005 The code is updated but the doc is not updated accordingly. ### Modifications 1: Update the doc based on the code updated. 2: also add some legacy missing methods in the context class of the python functions according to the code. 3: This update is only for Pulsar 2.4.0 release. No need to update docs in 2.4.1 and later releases as these docs have been updated.
### Motivation Pulsar release 2.4.0 supports setting partition key for messages when using function publish, as described in this PR: apache#4005 The code is updated but the doc is not updated accordingly. ### Modifications 1: Update the doc based on the code updated. 2: also add some legacy missing methods in the context class of the python functions according to the code. 3: This update is only for Pulsar 2.4.0 release. No need to update docs in 2.4.1 and later releases as these docs have been updated.
Motivation
In java functions, when a user calls publish, the outgoing message does not have a key set even though in the input message has a key. By default, set the same key for the outgoing publish method. This is the behavior when sending messages that are returned by the function
In python functions, the message key is never set. Make python functions consistent with java functions