Skip to content

(in progress) Fix duplication on shutdown fixes #23#34

Closed
ocadaruma wants to merge 3 commits intoline:masterfrom
ocadaruma:fix-duplication-on-shutdown
Closed

(in progress) Fix duplication on shutdown fixes #23#34
ocadaruma wants to merge 3 commits intoline:masterfrom
ocadaruma:fix-duplication-on-shutdown

Conversation

@ocadaruma
Copy link
Copy Markdown
Member

@ocadaruma ocadaruma commented Apr 17, 2020

Summary

Decaton allows process duplication by design.
However, it's expected duplication not to occur in ordinary processing situation. ordinary situation here means:

  • All consumer and brokers works properly
  • All tasks is completed within CONFIG_GROUP_REBALANCE_TIMEOUT_MS on partition revocation

Currently, process can be duplicated even in ordinary situation above due to 3 issues.

This PR fixes them.

Issue 1: ProcessPipeline returns immediately without doing process after terminated

  • This causes duplication by following sequence:

(tasks are denoted by [offset: key])
Say partition contains [1:a], [2:b], [3:b], [4:a], and partition concurrency is set to 2 (threadX,Y).
And key a will be routed to threadX, key b will be routed to threadY.

  1. threadX's queue will be like this: [1:a], [4:a]
  2. threadY's queue will be like this: [2:b], [3:b]
  3. OOOCC will be like this: [1(n), 2(n), 3(n), 4(n)] hw=0
    • (n) means not completed. (c) means completed.
  4. threadX completes [1:a] and [4:a]
  5. threadY completes only [2:b]
  6. OOOCC will be like this: [1(c), 2(c), 3(n), 4(c)] hw=2
  7. Initiate shutdown
  8. ProcessorPipeline returns without completing [3:b], hence [4:a] will be processed again after rebalance

Solution

Revert to process all queued tasks.

Issue 2: Kafka 2.4 has changed to call onPartitionsRevoked on shutdown

Kafka clients changed the behavior to call onPartitionRevoked on KafkaConsumer#close. (apache/kafka#6884)

This can cause duplication as below:

Say OOOCC is like this [1(c), 2(c), 3(n), 4(c)] hw=2.

  1. initiate shutdown
  2. All processors are destroyed (
    processors.destroySingletonScope(scope.subscriptionId());
    )
  3. consumer#close ( )
  4. revocation handler waits all pending tasks to complete, but since all processors are destroyed, 3(n) will not be completed by anyone (
    waitForRemainingTasksCompletion(rebalanceTimeoutMillis.value());
    )
  5. exceeds CONFIG_GROUP_REBALANCE_TIMEOUT_MS, 4(c) will be processed again after rebalance

Solution

onPartitionRevoked immediately returns if ProcessorSubscription already terminated.

Issue 3: ProcessorSubscription doesn't wait pending tasks on shutdown

Solution

wait all pending tasks on shutdown

@ocadaruma ocadaruma force-pushed the fix-duplication-on-shutdown branch from 3140cbc to 6370afd Compare April 17, 2020 04:49
@ocadaruma
Copy link
Copy Markdown
Member Author

ocadaruma commented Apr 17, 2020

As described above, to achieve "duplication not to occur in ordinary processing situation", it's required to wait all pending tasks.

But it brings another risk that, the possibility of ungraceful shutdown.
Say there are some pending tasks on ProcessorUnit's queue.

Waiting them all makes shutdown duration longer certainly.
We often set a timeout for shutdown to rollout new application soon, and once shutdown time exceeds timeout, often we have no choice other than killing the app ungracefully.

It can cause unpredictable result which possibly affects service.

In conclusion, I close this PR for now.
But this PR includes another useful changes which adds integration test to check core functionality.
I'll create an another PR for it.

@ocadaruma ocadaruma closed this Apr 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant