Skip to content

KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags#7961

Merged
vvcephei merged 6 commits intoapache:trunkfrom
vinothchandar:kafka-9431-local-lag-fetch-api
Jan 16, 2020
Merged

KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags#7961
vvcephei merged 6 commits intoapache:trunkfrom
vinothchandar:kafka-9431-local-lag-fetch-api

Conversation

@vinothchandar
Copy link
Copy Markdown
Member

  • Adds KafkaStreams#allLocalOffsetLags(), which returns lag information of all active/standby tasks local to a streams instance
  • LagInfo class encapsulates the current position in the changelog, endoffset in the changelog and their difference as lag
  • Lag information is a mere estimate; it can over-estimate (source topic optimization), or under-estimate.
  • Each call to allLocalOffsetLags() generates a metadata call to Kafka brokers, so caution advised
  • Unit and Integration tests added.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vinothchandar vinothchandar force-pushed the kafka-9431-local-lag-fetch-api branch from 24d2826 to 7de9b61 Compare January 14, 2020 23:13
@vinothchandar
Copy link
Copy Markdown
Member Author

@vvcephei @brary Open this PR for lag related changes...

@vinothchandar vinothchandar changed the title [KAFKA-9431] Expose API in KafkaStreams to fetch all local offset lags KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags Jan 14, 2020
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
@vinothchandar vinothchandar force-pushed the kafka-9431-local-lag-fetch-api branch from 5ab2bee to 44a5350 Compare January 15, 2020 21:07
@vinothchandar vinothchandar changed the base branch from trunk to temp-kafka-6144-query-restoring-and-standby January 15, 2020 21:07
brary and others added 4 commits January 15, 2020 13:16
 - Adds KafkaStreams#allLocalOffsetLags(), which returns lag information of all active/standby tasks local to a streams instance
 - LagInfo class encapsulates the current position in the changelog, endoffset in the changelog and their difference as lag
 - Lag information is a mere estimate; it can over-estimate (source topic optimization), or under-estimate.
 - Each call to allLocalOffsetLags() generates a metadata call to Kafka brokers, so caution advised
 - Unit and Integration tests added.
…#standbyRestoredOffsets

 - This map already contains the source topic optimization fenced last offset seen by an standby task
 - This is updated much more real-time without waiting for checkpointing
@vinothchandar vinothchandar force-pushed the kafka-9431-local-lag-fetch-api branch from 44a5350 to eb33dba Compare January 15, 2020 21:17
@vinothchandar vinothchandar changed the base branch from temp-kafka-6144-query-restoring-and-standby to trunk January 15, 2020 21:17
@vinothchandar
Copy link
Copy Markdown
Member Author

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this!

Looks good overall, just a few small comments.

One other thought I had was about swapping out those internal maps for a ConcurrentHashMap. Are any of these on a hot path, such that we would worry about impacting the performance of Streams if frequent IQs are contending with processing for the striped locks?

If it is a concern, I'm wondering if they are really necessary. It seems like their main purpose is as a memory barrier, but there are other ways to add memory barriers without the synchronization overhead.

WDYT?

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated

@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
throw new UnsupportedOperationException("Not implemented yet");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this change? It looks unrelated, but I'm guessing it's related somehow to your tests?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats the variant I am using in allLocalOffsetLags.. I swear, at some point I needed it for the KafkaStreamsTest to pass .. I took another look though and test currently with a partial mock on MockAdminClient seems to pass without this change.. So will back this out. Thanks for the call out

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
private static final long CONSUMER_TIMEOUT_MS = 60000;

@Rule
public TemporaryFolder folder = new TemporaryFolder();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat, but we shouldn't use it. There's an IntegrationTestUtil for getting a temporary folder, which is hooked in to support for different testing environments to set their desired temporary file location.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a TestUtils#temporaryFolder which basically uses java Files.createTempDirectory/deleteOnExit route.. This is what you switched to in the QueryableStateIntegrationTest as I see ..
I ll play by house rules.. but I don't see a IntegrationTestUtil or a related method in IntegrationTestUtils .. I see one where it purges local state dir from streams config.. Is that what you are referring to..

Once you respond, I will clean up both tests

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it's org.apache.kafka.test.TestUtils#tempDirectory(). My mistake. The protocol is for all temporary state in Kafka tests to use that method.

The change I made in QueryableStateIntegrationTest is basically what we should do here as well.

@vvcephei
Copy link
Copy Markdown
Contributor

@vinothchandar
Copy link
Copy Markdown
Member Author

@vvcephei
Copy link
Copy Markdown
Contributor

Hey @vinothchandar , I looked into the possible contention between IQ and processing, and I found that, yes, they may introduce contention, but it's probably not too much of a concern for this PR.

In most cases, these collections are only accessed during rebalance, which doesn't need to be super high performing. They are also accessed during restoration, which is more of a concern because we want the restore to complete as fast as possible. However, there would only be contention when we call the local lag API, so an easy workaround is just to call it less often if profiling shows high contention.

Also, this is a new API, so I'm not so concerned that existing code would suddenly perform worse. There will certainly be more synchronization overhead in the restore path, but there's also a lot of I/O in the restore path, and I'd generally expect the I/O to dominate.

Finally, I noticed that the collection most used in the hot processing path, org.apache.kafka.streams.processor.internals.AssignedTasks#running, was already a ConcurrentHashMap.

In conclusion, it seems reasonable to say that we may be introducing more overhead here, but it's likely a small amount. In exchange for the new feature it's probably worthwhile.

Plus, there are concurrent efforts to completely refactor the internals, and those maps would probably go away in that case. It makes sense to go ahead and take a simple and safe approach with this change, see what happens with the refactor, and then optimize performance as necessary.

Copy link
Copy Markdown
Member Author

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is documented to be called once every few seconds, not in a critical path.. There would be lot more problems on the Kafka brokers themselves, if someone calls this in a tight for-loop say.

On the maps, the need for concurrent map is mainly to provide safe iteration of the map, while removals may be going on (my conclusion after reading code in places like AssignedTasks etc where the task state management seems to be removing and adding tasks from different maps).. If you have a better suggestion, I am listening..
Not sure I followed the memory barrier comment.. Ideally, if each task had just the two longs (position, endoffset) within themselves, we can just use volatile and deal with it.. Unfortunately, the tasks seem to have maps to deal with multiple partitions (partition grouper support?). I am intending to do a follow up, along with exploring fetching of the end offset out of memory... This is also some feedback for the re-design.. @guozhangwang did confirm that the new design with have a clear lag api for Tasks..

On performance of concurrent map itself, I am not sure what exactly you are getting at. I have used it critical paths before with good success.. Can you elaborate your specific performance concern? (we can also sidebar this later, if you are happy to revisit later, given its only called every few seconds)

Note: I will respond to the larger open item on lag reporting once I am in office.

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated

@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
throw new UnsupportedOperationException("Not implemented yet");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats the variant I am using in allLocalOffsetLags.. I swear, at some point I needed it for the KafkaStreamsTest to pass .. I took another look though and test currently with a partial mock on MockAdminClient seems to pass without this change.. So will back this out. Thanks for the call out

private static final long CONSUMER_TIMEOUT_MS = 60000;

@Rule
public TemporaryFolder folder = new TemporaryFolder();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a TestUtils#temporaryFolder which basically uses java Files.createTempDirectory/deleteOnExit route.. This is what you switched to in the QueryableStateIntegrationTest as I see ..
I ll play by house rules.. but I don't see a IntegrationTestUtil or a related method in IntegrationTestUtils .. I see one where it purges local state dir from streams config.. Is that what you are referring to..

Once you respond, I will clean up both tests

@vvcephei
Copy link
Copy Markdown
Contributor

Hey @vinothchandar , Thanks for the response.

I think we're on the same page about the concurrency question.

Once you back out the MockAdminClient changes and fix the LagFetchIntegrationTest temp folder usage, we should be good to go!

Thanks again for all your work on this awesome feature.

@vinothchandar
Copy link
Copy Markdown
Member Author

@vvcephei CI passed!

@vvcephei vvcephei merged commit bbd3348 into apache:trunk Jan 16, 2020
final LagInfo other = (LagInfo) obj;
return currentOffsetPosition == other.currentOffsetPosition
&& endOffsetPosition == other.endOffsetPosition
&& this.offsetLag == other.offsetLag;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since offsetLag is computed from the other two fields, this comparison is not necessary.

private final Map<TopicPartition, Long> restoreToOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed some maps are changed to ConcurrentHashMap.
May I ask what was the selection criterion for the change ?

thanks

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for safe iteration from the thread calling the lag fetch API

ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 21, 2020
Conflicts or compilation errors due to the fact that we temporarily
reverted the commit that removes Scala 2.11 support:

* AclCommand.scala: take upstream changes.
* AclCommandTest.scala: take upstream changes.
* TransactionCoordinatorTest.scala: don't use SAMs, but adjust
mock call to putTransactionStateIfNotExists given new signature.
* TransactionStateManagerTest: use Runnable instead of SAMs.
* PartitionLockTest: use Runnable instead of SAMs.
* docs/upgrade.html: take upstream changes excluding line that
states that Scala 2.11 support has been removed.

* apache-github/trunk: (28 commits)
  KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989)
  MINOR: Update AclCommand help message to match implementation (apache#7990)
  MINOR: Update introduction page in Kafka documentation
  MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954)
  KAFKA-9338; Fetch session should cache request leader epoch (apache#7970)
  KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865)
  KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967)
  MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973)
  MINOR: Fix typo in connect integration test class name (apache#7976)
  KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745)
  KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966)
  MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971)
  [MINOR]: Fix typo in Fetcher comment (apache#7934)
  MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975)
  MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974)
  KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961)
  KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963)
  KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943)
  MINOR: Removed accidental double negation in error message. (apache#7834)
  KAFKA-6144: IQ option to query standbys (apache#7962)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants