forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Describe the bug
When use PulsarSink with delivery guarantee DeliveryGuarantee.EXACTLY_ONCE and delaySendingMessage, the deliverAtTime would be invalid and consumer can poll message before deliverAtTime.
To Reproduce
Steps to reproduce the behavior:
- SinkDemo
PulsarSink.builder()
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setSerializationSchema(serializationSchema)
.setTopics(topic)
.setProperties(props)
.delaySendingMessage(MessageDelayer.fixed(Duration.ofMinutes(1)))
.build();- SourceDemo
PulsarSource.builder()
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setStartCursor(StartCursor.earliest())
.setDeserializationSchema(deserializationSchema)
.setTopics(topic)
.setSubscriptionName("my-subscription")
.setSubscriptionType(SubscriptionType.Shared)
.build();Expected behavior
SourceDemo should receive message after at least 1 minute, actually pulsar source receive message before deliverAtTime in message metadata.
Screenshots
N/A
Additional context
Java client api : org.apache.flink:flink-connector-pulsar:1.15.1
Deploy mode: standalone on docker(2.10.1) & cluster on k8s(2.9.x)