Skip to content

MINOR: retry upon missing source topic#20284

Merged
mjsax merged 6 commits intoapache:trunkfrom
RaidenE1:retry-upon-missing-source-topic
Aug 6, 2025
Merged

MINOR: retry upon missing source topic#20284
mjsax merged 6 commits intoapache:trunkfrom
RaidenE1:retry-upon-missing-source-topic

Conversation

@RaidenE1
Copy link
Copy Markdown
Contributor

@RaidenE1 RaidenE1 commented Jul 31, 2025

Implements a timeout mechanism (using maxPollTimeMs) that waits for
missing source topics to be created before failing, instead of
immediately throwing exceptions in the new Streams protocol.
Additionally, throw TopologyException when partition count mismatch is
detected.

Reviewers: Lucas Brutschy lbrutschy@confluent.io, Alieh Saeedi
asaeedi@confluent.io, Matthias J. Sax matthias@confluent.io

@github-actions github-actions Bot added triage PRs from the community streams labels Jul 31, 2025
Copy link
Copy Markdown
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good to me. I left a few comments to cleanup/simplify the code.

private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>();

// Missing source topic timeout tracking
private long firstMissingSourceTopicTime = -1L;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would make things slighly more easy to read if we'd use
org.apache.kafka.common.utils.Timer for this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we rename this to a more generic topicsReadyTimer? I think we may want to reuse the timer to also time out when internal topics are not created in time.

private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>();

// Missing source topic timeout tracking
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you describe a member, I'd use a javadoc comment. But this comment isn't adding anything on top of the variable name, so maybe we can drop it altogether?

handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
} else {
// Reset timeout tracking when no missing source topics are reported
firstMissingSourceTopicTime = -1L;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you use org.apache.kafka.common.utils.Timer and call reset here, you don't need the inline comment.

@github-actions github-actions Bot removed the triage PRs from the community label Aug 2, 2025

// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: advance time less than 5 min and check if 2nd call throws exception and also check the log message (if easy) and then next step advancing time beyond 5 min as you did!


// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same suggestion (below) here.

@aliehsaeedii
Copy link
Copy Markdown
Contributor

@RaidenE1 Thank you, the PR looks good to me now. I had a suggestion, but it’s not necessary to address since you’re already checking the condition in a later test.

log.error(errorMsg);

// Reset timer for next timeout cycle
topicsReadyTimer.updateAndReset(maxPollTimeMs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to update the timer? We throw MissingSourceTopicException below, and this should shut down the thread?

final String errorMsg = status.statusDetail();
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
throw new TopologyException(errorMsg);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this case is newly added, but we did not add a new test for it?

@mjsax mjsax merged commit 03190e4 into apache:trunk Aug 6, 2025
20 of 24 checks passed
@RaidenE1 RaidenE1 deleted the retry-upon-missing-source-topic branch September 18, 2025 13:39
lucasbru pushed a commit that referenced this pull request Oct 22, 2025
…eleted (#20735)

Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
#20284](#20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).

This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency

## Solution

Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests

## Changes

1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
 <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@lucasbru lucasbru added the KIP-1071 PRs related to KIP-1071 label Oct 23, 2025
joshua2519 pushed a commit to joshua2519/kafka that referenced this pull request Oct 27, 2025
…eleted (apache#20735)

Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
apache#20284](apache#20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).

This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency

## Solution

Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests

## Changes

1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
 <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…eleted (apache#20735)

Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
apache#20284](apache#20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).

This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency

## Solution

Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests

## Changes

1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
 <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…eleted (apache#20735)

Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
apache#20284](apache#20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).

This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency

## Solution

Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests

## Changes

1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
 <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
shashankhs11 pushed a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…eleted (apache#20735)

Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
apache#20284](apache#20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).

This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency

## Solution

Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests

## Changes

1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
 <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved KIP-1071 PRs related to KIP-1071 streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants