Skip to content

HOTFIX: don't close or wipe out someone else's state#8478

Merged
guozhangwang merged 3 commits intoapache:trunkfrom
ableegoldman:HOTFIX-dont-close-someone-elses-state
Apr 15, 2020
Merged

HOTFIX: don't close or wipe out someone else's state#8478
guozhangwang merged 3 commits intoapache:trunkfrom
ableegoldman:HOTFIX-dont-close-someone-elses-state

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

When it comes to actually closing a task we now treat all states exactly the same, and call StateManagerUtil#closeStateManager regardless of whether it's in CREATED or RESTORING or RUNNING

Unfortunately StateManagerUtil doesn't actually check to make sure that we actually own the lock for this task's state. During a dirty close with eos enabled, we wipe the state -- but in some cases, this means deleting the state out from under another StreamThread who is still in the process of revoking this task

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 14, 2020

Retest this please.

@mjsax mjsax added the streams label Apr 14, 2020
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While reviewing the PR I had a meta question: the issue would happen if the previous owner thread has not released the lock while the new owner thread is going to close the task. The new thread will try to lock the dir while it is transiting to RESTORING, but the stateDirectory.lock(id) will return false immediately if the lock is at hands of another thread within this JVM, which is totally possible if one thread is closing the task while the other is grabbing the task.

So that means in StateManagerUtil#registerStateStores it is very possible that the lock(id) function would return false, which would be interpreted as fatal error -- more specifically, with incremental protocol since we always close the task before it is reassigned, it may not happen, but with the eager protocol it is very possible this will happen. Right? Is that an overlooked issue?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change intentional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sorry I meant to leave a comment. I noticed this is redundant since cleanupTask (invoked by closeTaskDirty) does the same thing

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! SG.

@ableegoldman
Copy link
Copy Markdown
Member Author

in StateManagerUtil#registerStateStores it is very possible that the lock(id) function would return false, which would be interpreted as fatal error

@guozhangwang Can you clarify? If the task fails to lock the directory in registerStateStores it throws a LockException, which is ultimately caught and handled by the TaskManager in tryToCompleteRestoration. It'll just stay in CREATED and try again on the next loop right?

@ableegoldman ableegoldman force-pushed the HOTFIX-dont-close-someone-elses-state branch from 61e1b8f to dad39b7 Compare April 14, 2020 23:07
public void testCloseStateManagerDirtyShallSwallowException() throws IOException {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

public void testCloseStateManagerThrowsExceptionWhenDirty() throws IOException {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually were already throwing some ProcessorStateExceptions up through close even when unclean, which I think was the cause of a bug we resolved a few weeks ago. Now we just make no assumptions about whether this will throw or not, and catch any exceptions in StreamTask / StandbyTask if it's a dirty close.

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

1 similar comment
@guozhangwang
Copy link
Copy Markdown
Contributor

test this

@guozhangwang
Copy link
Copy Markdown
Contributor

test this!

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

1 similar comment
@guozhangwang
Copy link
Copy Markdown
Contributor

test this

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang Can you clarify? If the task fails to lock the directory in registerStateStores it throws a LockException, which is ultimately caught and handled by the TaskManager in tryToCompleteRestoration. It'll just stay in CREATED and try again on the next loop right?

Ah yes, that's right.

@ableegoldman
Copy link
Copy Markdown
Member Author

Failures unrelated:
ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, after addressing this should be good to merge.

} else {
log.warn("Closing {} task {} uncleanly and swallows an exception", taskType, id, exception);
}
throw exception;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice clean, now we catch the exception in the caller.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! SG.

replayAll();

StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can set eosEnabled to false since we set it to true in the next test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

@guozhangwang guozhangwang merged commit 640be46 into apache:trunk Apr 15, 2020
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Apr 19, 2020
* 'trunk' of github.com:apache/kafka: (28 commits)
  MINOR: cleanup RocksDBStore tests  (apache#8510)
  KAFKA-9818: Fix flaky test in RecordCollectorTest (apache#8507)
  MINOR: reduce impact of trace logging in replica hot path (apache#8468)
  KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence (apache#8475)
  KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (apache#8501)
  MINOR: improve test coverage for dynamic LogConfig(s) (apache#7616)
  MINOR: Switch order of sections on tumbling and hopping windows in streams doc. Tumbling windows are defined as "special case of hopping time windows" - but hopping windows currently only explained later in the docs. (apache#8505)
  KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (apache#8488)
  HOTFIX: fix active task process ratio metric recording
  KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (apache#8448)
  MINOR: Use streaming iterator with decompression buffer when building offset map (apache#8494)
  Add log message in release.py (apache#8461)
  KAFKA-9854 Re-authenticating causes mismatched parse of response (apache#8471)
  KAFKA-9838; Add log concurrency test and fix minor race condition (apache#8476)
  KAFKA-9703; Free up compression buffer after splitting a large batch
  KAFKA-9779: Add Stream system test for 2.5 release (apache#8378)
  KAFKA-7885: TopologyDescription violates equals-hashCode contract. (apache#6210)
  MINOR: KafkaApis#handleOffsetDeleteRequest does not group result correctly (apache#8485)
  HOTFIX: don't close or wipe out someone else's state (apache#8478)
  MINOR: add process(Test)Messages to the README (apache#8480)
  ...
@ableegoldman ableegoldman deleted the HOTFIX-dont-close-someone-elses-state branch April 20, 2020 22:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants