Skip to content

Conversation

@sijie
Copy link
Member

@sijie sijie commented Jan 31, 2018

Motivation

This closes #1058

Modifications

  • introduce CommandConsumerGroupChange for notifying consumers with consumer group changes
  • bump the protocol version to v11
  • introduce ConsumerGroupListener for listening to group changes

Result

The consumer is able to register a ConsumerGroupListener to listen on group changes.

@sijie sijie added this to the 2.0.0-incubating milestone Jan 31, 2018
@sijie sijie self-assigned this Jan 31, 2018
@sijie sijie requested review from merlimat and rdhabalia January 31, 2018 02:07
@merlimat merlimat added the type/feature The PR added a new feature or issue requested a new feature label Jan 31, 2018
return false;
} else {
// If the active consumer is changed, send notification.
notifyConsumerGroupChanged(activeConsumer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual change in active consumer can happening a bit later. There is an artificial delay that is in place for a couple of reasons:

  1. Avoid flickering when multiple consumer are disconnecting/reconnecting (eg: during a failover scenario). Ideally we don't want to jump around and elect many active consumers when that happens.
  2. Avoid "most" duplicates due to in-flight (not acked) messages that get resent to the newly elected active consumer.

Here, between this time, when a consumer is elected and when it actually start to get messages, we should avoid notifying that it is "active", because it might get demoted again before it actually gets any messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat I have changed to notify active consumer changes only after the cursor is rewinded. please review it again.

* @see #setConsumerGroupListener(ConsumerGroupListener)
* @since 1.22.0
*/
public ConsumerGroupListener getConsumerGroupListener() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't use the termConsumerGroup anywhere else, but rather Subscription and Consumer, I think we should be sticking with it. Something like "ActiveConsumerListener"?

Also what would be the semantic on setting the listener on Shared/Exclusive subscriptions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed "ConsumerGroupListener" to "ActiveConsumerListener".

I added the logic on pulsar client to fail subscribe if a listener is set for non-failover subscription.

@sijie
Copy link
Member Author

sijie commented Feb 10, 2018

@merlimat I have addressed your comments. can you review this again? After it looks good, I will rebase it to latest master.

@merlimat merlimat changed the title Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group Feb 10, 2018
log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}",
consumerId, topicName, subscription.getName(), activeConsumerId);
}
cnx.ctx().write(Commands.newConsumerGroupChange(consumerId, activeConsumerId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use writeAndFlush(), otherwise the message won't be written on the socket until and unrelated flush is triggered.

Also, pass cnx.ctx().voidPromise() as the 2nd parameter to avoid creating the write promise since we don't care here.

}

void notifyConsumerGroupChange(long activeConsumerId) {
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking here that we could have more descriptive methods in Commands class to group all these feature checks in a single place. Eg:

if (!Commands.peerSupportsActiveConsumerListener(cnx)) {
    return;
}
...

(I know in other places we're checking the version like this, and we should also change separately from this PR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we need to be careful here is that C++ code gets compiled and automatically will present itself with the latest version v11 (or whatever).

In this case, we should make sure that C++ client can at least discard the notification

/**
* Listener on the consumer state changes.
*/
public interface ActiveConsumerListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use a single method, it becomes accessible through a lambda. Eg:

interface ActiveConsumerListener {
   void statusChanged(Consumer, int partitionId, Status status);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lambda is good for non-branchable operations. However it doesn't fit well for branchable operations. we can make a single method here using Status, however the application logic has to write if-else block to take different actions. It is much clear to me if we have two different methods for two different actions. so I chose to have separated methods for handling become active and become inactive, when applications are wring the listener, you just need to think what should do when it become active and what should do when it become inactive.

However if you think using Status and having a lambda interface is much better, let me know. I will make the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes sense.

One other thing that I was thinking is that we could have this listener to be more general oriented, so that we could in future reuse it for other kinds of notifications.

Something like ConsumerEventsListener so that is more neutral? What do you think?

}
}

void consumerGroupChanged(long activeConsumerId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerGroupChanged() --> activeConsumerChanged()

new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}

if (conf.getActiveConsumerListener() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it doesn't make a lot of sense to check for active/inactive on a non failover subscription. But wouldn't be in a way still semantically correct to always notify of "active" in exclusive/shared mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main motivation for this change here is for supporting failover subscription. I would disable this feature for other subscriptions for now, and introduce it if we really need it. does that make sense to you?

}

// changes on consumer group
message CommandConsumerGroupChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommandActiveConsumerChanged

protected void pickAndScheduleActiveConsumer() {
protected void notifyConsumerGroupChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try to avoid to send multiple "inactive" notifications to a consumer.
If I was already "inactive", I don't need to be notified when a new consumer is promoted and I'm not involved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the first iteration for this listener. I am trying to be avoiding any optimization at the beginning and deliver notifications for every changes. This way we can make sure the first version is "eventually" correct without hitting any race condition on handling this kind of smart logic. If sending multiple notifications become a problem we can do a subsequent change later.

does that make sense to you?

protected void notifyConsumerGroupChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
consumer.notifyConsumerGroupChange(activeConsumer.consumerId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumerId is a per-connection identifier (unique within a single TCP connection), but different connections from different clients can very well use the same Id. (The id is just a shortcut to refer to a particular topic/subscription without passing the names each time).

For consumers on a single subscription, there is no single unique identifier. consumerName is generally a random generated string, though there is collision detection (since collisions wont affect the correctness) and it can be manually overridden.

I think the only way is to use the Consumer object identify.

// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (null == currentActiveConsumer) {
log.warn("Current active consumer disappears while adding consumer {}", consumer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a debug log

@sijie
Copy link
Member Author

sijie commented Feb 13, 2018

@merlimat I have addressed your comments, please take another round of review. If this looks good for you, I will rebase it to latest master.

/**
* @return this configured {@link ActiveConsumerListener} for the consumer.
* @see #setActiveConsumerListener(ActiveConsumerListener)
* @since 1.22.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since 2.0

@merlimat
Copy link
Contributor

@sijie Change looks good, though there's still the issue at https://github.com/apache/incubator-pulsar/pull/1156/files#r167413443 . I think is using the consumerId of the active consumer to decide if other consumers are active/inactive, but we cannot compare consumerId across consumers.

The other comment was around making the Listener interface name more neutral so that we can add more event handlers, if (and when) the need arises, without having to add an additional listener.

Use consumer object as identifier for comparison
@sijie
Copy link
Member Author

sijie commented Feb 14, 2018

@merlimat as you suggested, I did two changes here:

  • rename ActiveConsumerListener to ConsumerEventListener. currently since we only use it for failover subscription, so I am still disallowing shared/exclusive subscription to use the listener. we can enable if we support more events in future.

  • I changed the dispatcher to use consumer object as the comparison identifier.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this provide a way to figure out the current active consumer?

This could also have been done by each consumer periodically adding a message to the topic. if you start getting the messages, you are the active consumer.

/**
* Notified when the consumer group is changed, and the consumer becomes the active consumer.
*/
void becomeActive(Consumer consumer, int partitionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

became?

@merlimat
Copy link
Contributor

Does this provide a way to figure out the current active consumer?

Not right now, I think it would be interesting to mention who's the active consumer in the topic in the topics stats. We could have a Boolean field in the ConsumerStats that is set to null in non-failover subscriptions.

@sijie
Copy link
Member Author

sijie commented Feb 14, 2018

This could also have been done by each consumer periodically adding a message to the topic. if you start getting the messages, you are the active consumer.

@ivankelly : yes that's the trick we are using now for the other project.

@sijie
Copy link
Member Author

sijie commented Feb 14, 2018

rebased to the latest master

@sijie
Copy link
Member Author

sijie commented Feb 14, 2018

the CI failed with license header. fixed license header.

I forgot another item to make CPP client ignore this notification.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Provide a mechanism to tell whether a consumer is the leader of a failover subscription

3 participants