Skip to content

KAFKA-9796; Broker shutdown could be stuck forever under certain conditions#8448

Merged
rajinisivaram merged 6 commits intoapache:trunkfrom
dajac:KAFKA-9796
Apr 16, 2020
Merged

KAFKA-9796; Broker shutdown could be stuck forever under certain conditions#8448
rajinisivaram merged 6 commits intoapache:trunkfrom
dajac:KAFKA-9796

Conversation

@dajac
Copy link
Copy Markdown
Member

@dajac dajac commented Apr 8, 2020

This patch reworks the SocketServer to always start the acceptor threads after the processor threads and to always stop the acceptor threads before the processor threads. It ensures that the acceptor shutdown is not blocked waiting on the processors to be fully shutdown by decoupling the shutdown signal and the awaiting. It also ensure that the processor threads drain its newConnection queue to unblock acceptors that may be waiting. However, the acceptors still bind during the startup, only the processing of new connections and requests is further delayed.

The flow looks like this now:

val socketServer = ...

socketServer.startup(startProcessingRequests = false)
// Acceptors are bound.

socketServer.startProcessingRequests(authorizerFutures)
// Acceptors and Processors process new connections and requests

socketServer.stopProcessingRequests()
// Acceptors and Processors are stopped

socketServer.shutdown()
// SocketServer is shutdown (metrics, etc.)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested a review from rajinisivaram April 8, 2020 13:47
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Apr 8, 2020

always stop the processor threads before the acceptor threads.

I would have thought we want to stop accepting before we stop processing.

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 8, 2020

always stop the processor threads before the acceptor threads.

I would have thought we want to stop accepting before we stop processing.

Indeed, it is a bit counter intuitive. The reason behind is that the Acceptor can be blocked by the Processor and thus can't be shutdown when it happens. We could probably keep a more intuitive ordering by decoupling the shutdown and the awaiting of the shutdown. Let me check this.

@dajac dajac changed the title [WIP] KAFKA-9796; Broker shutdown could be stuck forever under certain conditions KAFKA-9796; Broker shutdown could be stuck forever under certain conditions Apr 9, 2020
@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 9, 2020

@rajinisivaram The PR is ready to be reviewed.

@rajinisivaram
Copy link
Copy Markdown
Contributor

ok to test

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac Thanks for the PR, looks good. Left some minor comments.

* is used to delay processing client connections until server is fully initialized, e.g.
* to ensure that all credentials have been loaded before authentications are performed.
* Acceptors are always started during `startup` so that the bound port is known when this
* method completes even when ephemeral ports are used. Incoming connections on this server
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines are still true, but removed from the comment?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially. The acceptors are not started but start to listen. Let me rework the comment to include the part about the bound port though.

* is used to delay processing client connections until server is fully initialized, e.g.
* to ensure that all credentials have been loaded before authentications are performed.
* Acceptors are always started during `startup` so that the bound port is known when this
* method completes even when ephemeral ports are used. Incoming connections on this server
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines are still true, but removed from the comment?

* This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]]
* was invoked with `startupProcessors=false`.
* Start processing requests and new connections. This method is used for delayed starting of
* data-plane processors if [[kafka.network.SocketServer#startup]] was invoked with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not just data-plane processors?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Let me rework the comment.

* listener before other listeners. This allows authorization metadata for other listeners to be
* stored in Kafka topics in this cluster.
*
* @param authorizerFutures
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a description?

*/
private def closeAll(): Unit = {
// Clear to unblock blocked acceptors
newConnections.asScala.foreach(_.close())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocked acceptor would then add another connection to this list right? Do we close that one?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't close that one. Let me rework this.

externalReadyFuture.complete(null)
TestUtils.waitUntilTrue(() => listenerStarted(externalListener), "External listener not started")
} finally {
externalReadyFuture.complete(null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If it for the failure case, then perhaps it should be in a catch block?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed. It is a left over from my debugging. Let me remove it.

connect(testableServer, new ListenerName("EXTERNAL"), localAddr = InetAddress.getLocalHost)

// Wait to let the acceptor accepts the connections
Thread.sleep(100)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace sleep with some condition?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reworked this test. It still consistently fails without this patch and it does not have the sleep any more.

@rajinisivaram
Copy link
Copy Markdown
Contributor

retest this please

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 15, 2020

@rajinisivaram Thanks for the review! I have addressed on your comments. Could you please have another look at it?

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac Thanks for the updates, looks good. Left just a couple of minor comments.

* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
* Initiates a graceful shutdown by signaling to stop
*/
def shutdown(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this method to be initiateShutdown() to be consistent with kafka.utils.ShutdownableThread?

while (!newConnections.isEmpty) {
newConnections.poll().close()
}
newConnections.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear() is unnecessary since we would expect the loop to clear (i.e. we shouldn't have code that clears without closing).

@rajinisivaram
Copy link
Copy Markdown
Contributor

retest this please

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 16, 2020

@rajinisivaram Thanks. I have addressed your comments.

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac Thanks for the updates, LGTM.

@rajinisivaram
Copy link
Copy Markdown
Contributor

ok to test

@rajinisivaram
Copy link
Copy Markdown
Contributor

retest this please

@rajinisivaram
Copy link
Copy Markdown
Contributor

Test failure not related, merging to trunk.

@rajinisivaram rajinisivaram merged commit 9a36d9f into apache:trunk Apr 16, 2020
@dajac dajac deleted the KAFKA-9796 branch April 16, 2020 16:16
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Apr 19, 2020
* 'trunk' of github.com:apache/kafka: (28 commits)
  MINOR: cleanup RocksDBStore tests  (apache#8510)
  KAFKA-9818: Fix flaky test in RecordCollectorTest (apache#8507)
  MINOR: reduce impact of trace logging in replica hot path (apache#8468)
  KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence (apache#8475)
  KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (apache#8501)
  MINOR: improve test coverage for dynamic LogConfig(s) (apache#7616)
  MINOR: Switch order of sections on tumbling and hopping windows in streams doc. Tumbling windows are defined as "special case of hopping time windows" - but hopping windows currently only explained later in the docs. (apache#8505)
  KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (apache#8488)
  HOTFIX: fix active task process ratio metric recording
  KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (apache#8448)
  MINOR: Use streaming iterator with decompression buffer when building offset map (apache#8494)
  Add log message in release.py (apache#8461)
  KAFKA-9854 Re-authenticating causes mismatched parse of response (apache#8471)
  KAFKA-9838; Add log concurrency test and fix minor race condition (apache#8476)
  KAFKA-9703; Free up compression buffer after splitting a large batch
  KAFKA-9779: Add Stream system test for 2.5 release (apache#8378)
  KAFKA-7885: TopologyDescription violates equals-hashCode contract. (apache#6210)
  MINOR: KafkaApis#handleOffsetDeleteRequest does not group result correctly (apache#8485)
  HOTFIX: don't close or wipe out someone else's state (apache#8478)
  MINOR: add process(Test)Messages to the README (apache#8480)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants