Skip to content

KAFKA-9422: Track the set of topics a connector is using (KIP-558)#8017

Merged
rhauch merged 7 commits intoapache:trunkfrom
kkonstantine:kafka-9422
Jan 30, 2020
Merged

KAFKA-9422: Track the set of topics a connector is using (KIP-558)#8017
rhauch merged 7 commits intoapache:trunkfrom
kkonstantine:kafka-9422

Conversation

@kkonstantine
Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine commented Jan 29, 2020

This feature corresponds to KIP-558 and extends how the status.storage.topic is used to include information that allows Kafka Connect to keep track which topics a connector is using.

The set of topics a connector is actively using, is exposed via a new endpoint that is added to the REST API of Connect workers.

  • A GET /connectors/{name}/topics request will return the set of topics that have been recorded as active since a connector started or since the set of topics was reset for this connector.

An additional endpoints allows users to reset the set of active topics for a connector via the second endpoint that this feature is adding:

  • A PUT /connectors/{name}/topics/reset request clears the set of active topics.

An operator may enable or disable this feature by setting:
topic.tracking.enable (true by default).

Additionally, an operator may disable only reset requests via the Connect REST API by setting:
topic.tracking.allow.reset to false (true by default).

In the current commit the code for KIP-558 is feature complete.

It's tested by extending the current unit tests. Because changes are required in the integration testing framework itself in order to add integration testing for topic tracking (KIP-558), these tests will be submitted in a follow up pull request. Same, for Connect system tests that will extend coverage beyond integration testing. System tests will be added in a separate pull request, if additional coverage is required beyond integration testing.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kkonstantine
Copy link
Copy Markdown
Contributor Author

cc @rhauch this is ready for a first pass. thx!

@kkonstantine
Copy link
Copy Markdown
Contributor Author

retest this please

@kkonstantine kkonstantine requested a review from rhauch January 29, 2020 19:08
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: That's a simple rename, because the member variable topic is easy to be masked by a local variable with this new feature.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jan 29, 2020

ok to test

@kkonstantine
Copy link
Copy Markdown
Contributor Author

ok to test

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic work, @kkonstantine. I like the simplicity of this implementation of KIP-558. I do have a few questions/suggestions below, all of which are pretty minor.

Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java Outdated
Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java Outdated
Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java Outdated
Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java Outdated
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about doing something like the following to reduce the duplicate code:

Supplier<TopicStatus> returnStatus = () -> new TopicStatus(
        topicCapture.getValue(),
        new ConnectorTaskId(connectorCapture.getValue(), 0),
        Time.SYSTEM.milliseconds());
if (anyTimes) {
    expect.AndStubAnswer(returnStatus);
} else {
    expect.AndAnswer(returnStatus);
}

and doing the same in the other two similar methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fuzzy on the details right now, but seems that it can't be inferred if it's not given inline.
Maybe we can optimize in a follow up that will include the other calls that add two implementations of IAnswer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was a minor change while leaving the fact there were still two other similar methods. We maybe should refactor later.

Copy link
Copy Markdown
Contributor Author

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rhauch for the quick review.
Please find replies inline

Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java Outdated
Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java Outdated
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fuzzy on the details right now, but seems that it can't be inferred if it's not given inline.
Maybe we can optimize in a follow up that will include the other calls that add two implementations of IAnswer

RetryWithToleranceOperator retryWithToleranceOperator) {
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main issue is that the new methods seem to belong to the StatusBackingStore interface. Fleshing them out to a different one (let's say TopicTracker) would would separate concerns here, but would also add a step of redirection that might be more complex to follow. wdyt?

Map<String, Object> topicStatusMetadata = (Map<String, Object>) innerValue;
return new TopicStatus((String) topicStatusMetadata.get(TOPIC_NAME_KEY),
(String) topicStatusMetadata.get(TOPIC_CONNECTOR_KEY),
((Long) topicStatusMetadata.get(TOPIC_TASK_KEY)).intValue(),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was also to be more fault tolerant. But I also didn't want to diverge from the pattern used for the rest of the values here (e.g. #parseConnectorStatus). I'd suggest refactoring in a separate PR for all.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great work, @kkonstantine!

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jan 30, 2020

ok to test

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jan 30, 2020

retest this please

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jan 30, 2020

The PR doesn't show the builds results, but:

The MirrorConnectorsIntegrationTest test failure appears to be a flaky test (see previously reported KAFKA-9013) that fails with:

java.lang.RuntimeException: Could not find enough records. found 0, expected 100
	at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:334)
	at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:198)

I tried 6 builds of the trunk branch, and 4 passed and 2 failed with the same apparently flaky test.

Therefore, I don't think this PR caused the flaky test, so I will merge.

@rhauch rhauch merged commit 7746301 into apache:trunk Jan 30, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 2, 2020
Conflicts and/or compiler errors due to the fact that we
temporarily reverted the commit that removes
Scala 2.11 support:

* SslAdminIntegrationTest: keep using JAdminClient,
take upstream changes otherwise.
* ReassignPartitionsClusterTest: keep using
JAdminClient, take upstream changes otherwise.
* KafkaApis: use `asScala.foreach` instead of
`forEach`.

# By Ismael Juma (3) and others
# Via GitHub
* apache-github/trunk: (22 commits)
  KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (apache#7994)
  KAFKA-9375: Add names to all Connect threads (apache#7901)
  MINOR: Introduce 2.5-IV0 IBP (apache#8010)
  KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (apache#8011)
  Add retries to release.py script (apache#8021)
  KAFKA-8162: IBM JDK Class not found error when handling SASL (apache#6524)
  MINOR: Add explicit result type in public defs/vals (apache#7993)
  KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (apache#7940)
  KAFKA-9474: Adds 'float64' to the RPC protocol types (apache#8012)
  KAFKA-9360: Allow disabling MM2 heartbeat and checkpoint emissions (apache#7887)
  KAFKA-7658: Add KStream#toTable to the Streams DSL (apache#7985)
  KAFKA-9445: Allow adding changes to allow serving from a specific partition (apache#7984)
  KAFKA-9422: Track the set of topics a connector is using (KIP-558) (apache#8017)
  KAFKA-9040; Add --all option to config command (apache#7607)
  KAFKA-4203: Align broker default for max.message.bytes with Java producer default (apache#4154)
  KAFKA-9426: Use switch instead of chained if/else in OffsetsForLeaderEpochClient (apache#7959)
  KAFKA-9405: Use Map.computeIfAbsent where applicable (apache#7937)
  KAFKA-9026: Use automatic RPC generation in DescribeAcls (apache#7560)
  MINOR: Remove unused fields in StreamsMetricsImpl (apache#7992)
  KAFKA-9460: Enable only TLSv1.2 by default and disable other TLS protocol versions (KIP-553) (apache#7998)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants