Skip to content

KAFKA-9360: Fix heartbeat and checkpoint emission can not turn off#7887

Merged
mimaison merged 1 commit intoapache:trunkfrom
ning2008wisc:fix_knob
Jan 30, 2020
Merged

KAFKA-9360: Fix heartbeat and checkpoint emission can not turn off#7887
mimaison merged 1 commit intoapache:trunkfrom
ning2008wisc:fix_knob

Conversation

@ning2008wisc
Copy link
Copy Markdown
Contributor

@ning2008wisc ning2008wisc commented Jan 2, 2020

emit.heartbeats.enabled and emit.checkpoints.enabled are supposed to
be the knobs to control if the heartbeat message or checkpoint message
will be sent or not to the topics respectively. In our experiments,
setting them to false will not suspend the activity in their SourceTasks,
e.g. MirrorHeartbeatTask, MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of
SourceRecord are being sent without interval, causing significantly high
CPU usage of MirrorMaker 2 instance, GC time and congesting the single partition of
the heartbeat topic and checkpoint topic.

Screen Shot 2020-01-02 at 2 55 38 PM

Screen Shot 2020-01-02 at 3 18 23 PM

The proposed fix in the following PR is to (1) explicitly check if interval
is set to negative (e.g. -1), when the emit.heartbeats.enabled or
emit.checkpoints.enabled is off. (2) if interval is indeed set to negative,
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in order
to prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages
to Kafka topics

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ning2008wisc ning2008wisc changed the title Fix heartbeat and checkpoint emission can not turn off KAFKA-9360: Fix heartbeat and checkpoint emission can not turn off Jan 2, 2020
Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding this bug! Your approach looks fine, but I think we can do better. You'll see I've got this logic already:

stopped.await(interval.toMillis(), MILLISECONDS)

... which will wait forever unless the task is stopped. As soon as the task is stopped, e.g. during a rebalance, the await() returns and unblocks the WorkerSourceTask.

I think we can do something like that in poll() too. If interval is negative, just await MAX_LONG or something:

if (interval.isNegative()) {
    stopped.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
}

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@ryannedolan great comments. updated the PR

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@mimaison when you have some free cycle, do you mind to review this PR? Thanks

@mimaison
Copy link
Copy Markdown
Member

Thanks for the PR.

Is it possible to not even start the connectors if they are disabled? Could we do that in https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L203?

@ryannedolan
Copy link
Copy Markdown
Contributor

ryannedolan commented Jan 13, 2020

Another approach is to create zero tasks when these are disabled. (i.e. create the connectors but no tasks).

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@ryannedolan
Copy link
Copy Markdown
Contributor

I'm fine with any of these approaches. The nice thing about having the Connectors and/or Tasks run, even if they don't do anything, is they could potentially still emit metrics and log messages, maybe.

You'll notice that Herders are started for each cluster-cluster pair (in MirrorMaker.java), even for pairs that are "disabled". This is because we want to emit Heartbeats even when no replication is done. So you can have A->B.enabled = false and no replication will be done, but you'll still see heartbeats emitted to B (just not replicated from A back to B).

At one point I had logic in MirrorMaker.java to not create herders for disabled flows, as you might expect would happen. But then almost everything broke, because so much depends on heartbeats. For example, if you have A->B enabled and B->A disabled, you'd get no heartbeats in either direction. This is because the A->B herder emits heartbeats to B, and the B->A herder replicates them from B->A. Without both herders (and both MirrorSourceConnectors) running in both directions, you don't see any replicated heartbeats, even tho data is being replicated!

For this reason, MM2 always creates all herders for all cluster-cluster pairs. For flows that are disabled, MirrorSourceConnector just doesn't create any replication tasks.

I suspect we should do the same with heartbeats and checkpoints. If these are disabled, the herder should still get created, the Connectors should still run, but they should just do nothing.

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

ning2008wisc commented Jan 13, 2020

@ryannedolan thanks for providing your thoughts on the design. I have a couple of questions:

(1) if heartbeat is to detect the liveness of the system which is important with minimum traffic/load, should we still expose the knob to turn off the heartbeat or always make it on?

(2) I may not see how the heartbeat topic will be used or consumed in the kafka open-source code, so does it mean the consumption of the heartbeat topic leave up to the users to implement some healthiness monitor/tool in house?

@ryannedolan
Copy link
Copy Markdown
Contributor

(1) if heartbeat is to detect the liveness of the system which is important with minimum traffic/load, should we still expose the knob to turn off the heartbeat or always make it on?

Heartbeats are necessary for RemoteClusterUtils to work, e.g. upstreamClusters() just looks for replicated heartbeat topics. However, technically everything works so long as the topics exist -- even if there are no actual heartbeat records. I think, ideally, the MirrorHeartbeatConnector would always run and always create the appropriate topics, even if heartbeats are disabled. Then, emit.heartbeats.enabled=false would just not run the tasks, or run them as no-ops like you've implemented.

(2) I may not see how the heartbeat topic will be used or consumed in the kafka open-source code, so does it mean the consumption of the heartbeat topic leave up to the users to implement some healthiness monitor/tool in house?

Heartbeat messages are not consumed by the open-source code, tho you are correct in assuming that third-party tooling (such as Cloudera's Streams Replication Manager) has been built to use them. But again, the topics are indeed used by RemoteClusterUtils -- just not the records.

It's also worth noting that heartbeats could come from external systems. Anything could emit heartbeats in any format, and the RemoteClusterUtils stuff would still work.

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

thanks so much for the details.
@ryannedolan @mimaison Since this still seems a bug, after discussion, could we align on the approach to fix this? For example, MirrorHeartbeatConnector is still created, but no task is created, when emit.heartbeats.enabled=false ?

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@mimaison @ryannedolan see if the updated PR works better, I tested it in my environment.

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach 👌 lgtm, thanks!

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@mimaison any thoughts on this approach? Thanks

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

hi @mimaison understand that this is not super critical, but since it is a simple fix, just want to check in and see if you think this approach is acceptable. would like to hear your thoughts and comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for !config.enabled() here too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we want to emit heartbeats even if A->B.enabled=false (see my earlier comment). Basically, we want heartbeats to be sent even if replication is disabled, unless heartbeats are explicitly disabled with emit.heartbeats.enabled = false.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, maybe worth adding that the explanation to the comment

Copy link
Copy Markdown
Contributor Author

@ning2008wisc ning2008wisc Jan 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, I will add the the explanation to the comment in the next update

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isNegative() is not highly recommended, what about this change? Should we still check isNegative() or return the interval as Duration.ofMillis(Long.MAX_VALUE) ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like allowing intervals to be negative interval to disable connectors is not a great pattern. Instead preventing the interval to be negative and relying solely on enabled() would have been a better alternative.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryannedolan , as @mimaison suggested, if emit.checkpoints.enabled = false, do you think if it is a better alternative to return Long.MAX_VALUE, rather than negative?

    Duration emitCheckpointsInterval() {
        if (getBoolean(EMIT_CHECKPOINTS_ENABLED)) {
            return Duration.ofSeconds(getLong(EMIT_CHECKPOINTS_INTERVAL_SECONDS));
        } else {
            // negative interval to disable
            return Duration.ofMillis(Long.MAX_VALUE);
        }
    }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the interval configs (sync topics acls, sync topic configs, refresh topics, ...) use negative values to indicate that the Scheduler should not run the corresponding task. I suppose we could have used an Optional or something instead. If we drop the negatives here, we should do so with all the other tasks. Mox nix to me.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ideal but yes we don't want to change the current behaviour here. Let's keep the negative values.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks for all of your inputs. Then I suppose the only change in the next update is to adding the explanation to the comment on why we use isNegative()to disable theheartbeatandcheckpoint` emissions.

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@ryannedolan @mimaison updated the comments in the code, please take another review when possible. Thanks

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thx again!

Copy link
Copy Markdown
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for all the back and forth so such a small change but can you add a couple of small tests that cover this change?

@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@mimaison tests are added. Thanks for your great feedback

@mimaison
Copy link
Copy Markdown
Member

@ning2008wisc Thanks for the quick update. The change looks good but it's failing checkstyle as there's a couple of unused imports.
To run checkstyle, run:

gradle
./gradlew connect:mirror:test

`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to
be the knobs to control if the heartbeat message or checkpoint message
will be sent or not to the topics respectively. In our experiments,
setting them to false will not suspend the activity in their SourceTasks,
e.g. MirrorHeartbeatTask, MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of
`SourceRecord` are being sent without interval, causing significantly high
CPU usage and GC time  of MirrorMaker 2 instance and congesting the single
partition of the heartbeat topic and checkpoint topic.

The proposed fix in the following PR is to (1) explicitly check if `interval`
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative,
no task is created.
@ning2008wisc
Copy link
Copy Markdown
Contributor Author

@mimaison thanks for showing how to do checkstyle. Now my local run of checkstyle returns success.

@mimaison
Copy link
Copy Markdown
Member

retest this please

Copy link
Copy Markdown
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mimaison mimaison merged commit f217752 into apache:trunk Jan 30, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 2, 2020
Conflicts and/or compiler errors due to the fact that we
temporarily reverted the commit that removes
Scala 2.11 support:

* SslAdminIntegrationTest: keep using JAdminClient,
take upstream changes otherwise.
* ReassignPartitionsClusterTest: keep using
JAdminClient, take upstream changes otherwise.
* KafkaApis: use `asScala.foreach` instead of
`forEach`.

# By Ismael Juma (3) and others
# Via GitHub
* apache-github/trunk: (22 commits)
  KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (apache#7994)
  KAFKA-9375: Add names to all Connect threads (apache#7901)
  MINOR: Introduce 2.5-IV0 IBP (apache#8010)
  KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (apache#8011)
  Add retries to release.py script (apache#8021)
  KAFKA-8162: IBM JDK Class not found error when handling SASL (apache#6524)
  MINOR: Add explicit result type in public defs/vals (apache#7993)
  KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (apache#7940)
  KAFKA-9474: Adds 'float64' to the RPC protocol types (apache#8012)
  KAFKA-9360: Allow disabling MM2 heartbeat and checkpoint emissions (apache#7887)
  KAFKA-7658: Add KStream#toTable to the Streams DSL (apache#7985)
  KAFKA-9445: Allow adding changes to allow serving from a specific partition (apache#7984)
  KAFKA-9422: Track the set of topics a connector is using (KIP-558) (apache#8017)
  KAFKA-9040; Add --all option to config command (apache#7607)
  KAFKA-4203: Align broker default for max.message.bytes with Java producer default (apache#4154)
  KAFKA-9426: Use switch instead of chained if/else in OffsetsForLeaderEpochClient (apache#7959)
  KAFKA-9405: Use Map.computeIfAbsent where applicable (apache#7937)
  KAFKA-9026: Use automatic RPC generation in DescribeAcls (apache#7560)
  MINOR: Remove unused fields in StreamsMetricsImpl (apache#7992)
  KAFKA-9460: Enable only TLSv1.2 by default and disable other TLS protocol versions (KIP-553) (apache#7998)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants