Skip to content

Conversation

@pradeepbn
Copy link
Contributor

@pradeepbn pradeepbn commented Nov 10, 2021

Descriptions of the changes in this PR:

Motivation

After the readcache has been shutdown, all the buffers will be released in memory. In the meantime, if read and write ops are performed on the read cache, then it results in the segmentation fault on invalid memory. So, this PR will shutdown the request processor that does readops and writeops before the bookie shutdown sequence.

Changes

  • Stop all the requestProcessor threads before bookie shutdown sequence to avoid read/write ops during bookie shutdown
  • Force shutdown the executors so that it is ensured to stop all executor threads before bookie shutdown.

How to reproduce

…are no read/ write ops while shutting down the bookie
@Vanlightly
Copy link
Contributor

The order of the shutdown looks good. One significant problem is that the shutdown of the OrderExecutors was not done quite right, causing the shutdown to block for a significant time period (1000 seconds x number of total threads).

The forceShutdown method should only be called after having called shutdown() as what forceShutdown does is simply wait for the ExecutorService to terminate, for those 1000 seconds calling shutdownNow() if the timeout is reached. Because shutdown() wasn't called first the ExecutorService is still running normally. So you need to ensure that service.shutdown() is called first.

I would also say 1000 seconds is quite high, this is the upper bound for each thread, called serially. If we call shutdown() on all executors first, then we could time box the total time we're willing to wait for all executors to be terminated.

… active before sending response because it can be closed while responding; make bookie process as PID=1 in docker so that it can receive SIGINT
@pradeepbn
Copy link
Contributor Author

The order of the shutdown looks good. One significant problem is that the shutdown of the OrderExecutors was not done quite right, causing the shutdown to block for a significant time period (1000 seconds x number of total threads).

The forceShutdown method should only be called after having called shutdown() as what forceShutdown does is simply wait for the ExecutorService to terminate, for those 1000 seconds calling shutdownNow() if the timeout is reached. Because shutdown() wasn't called first the ExecutorService is still running normally. So you need to ensure that service.shutdown() is called first.

I would also say 1000 seconds is quite high, this is the upper bound for each thread, called serially. If we call shutdown() on all executors first, then we could time box the total time we're willing to wait for all executors to be terminated.

@Vanlightly as discussed bumped it down to 10 seconds. By the time the first thread in the sequence is flushed, we expect other threads to have flushed as well. So, the max upper bound will be 10 + 5s.

@pradeepbn pradeepbn marked this pull request as ready for review November 18, 2021 02:05
Copy link
Contributor

@Vanlightly Vanlightly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Vanlightly
Copy link
Contributor

@eolivelli could you review?

statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
} else {
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
if (channel.isActive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about adding a "ELSE" branch with a log message ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
channel.writeAndFlush(response, channel.voidPromise());
if (channel.isActive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about adding a "ELSE" branch with a log message ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently what happens when the BookieServer is shutdown is that for each in-progress op, when the response is sent it will fail because the channel is closed, and that will get logged as a metric:

if (!future.isSuccess()) {
    requestProcessor.getRequestStats().getChannelWriteStats()
                        .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
} else {
    requestProcessor.getRequestStats().getChannelWriteStats()
                        .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
 }

So either we add an ELSE branch with the same registerFailedEvent call, or we don't use an IF at all and allow it to use the existing logic flow.

@pradeepbn what was the reason for avoiding channel.writeAndFlush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say this change is not strictly related to shutdown reordering, so I think removing these channel checks can be removed from this PR.

Copy link
Contributor Author

@pradeepbn pradeepbn Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vanlightly

bookie1_1 | 02:15:08,926 ERROR Failed to submit a listener notification task. Event loop shut down? bookie1_1 | java.util.concurrent.RejectedExecutionException: event executor terminated bookie1_1 | at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) ~[netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) ~[netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) ~[netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) ~[netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) ~[netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499) [netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) [netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) [netty-transport-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) [netty-transport-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at org.apache.bookkeeper.proto.PacketProcessorBaseV3.sendResponse(PacketProcessorBaseV3.java:91) [bookkeeper-server.jar:?] bookie1_1 | at org.apache.bookkeeper.proto.WriteEntryProcessorV3.sendResponse(WriteEntryProcessorV3.java:180) [bookkeeper-server.jar:?] bookie1_1 | at org.apache.bookkeeper.proto.WriteEntryProcessorV3$1.writeComplete(WriteEntryProcessorV3.java:108) [bookkeeper-server.jar:?] bookie1_1 | at org.apache.bookkeeper.bookie.Journal$QueueEntry.run(Journal.java:335) [bookkeeper-server.jar:?] bookie1_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] bookie1_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] bookie1_1 | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final] bookie1_1 | at java.lang.Thread.run(Thread.java:829) [?:?]

If we do not check for isActive(), we will get this exception on channel.writeAndFlush at the time of shutdown. This happens because of nettyserver shutdown before the request processor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a controlled exception within Netty but it will pollute the log during shutdown. So I think that's its worth keeping the IF statement. What do you think @eolivelli?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's keep the original "if"

we could add an "else" branch with a DEBUG log that says that we are skipping the write, this would help in debugging tests failures probably one day

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the else statement with the logs. CC: @eolivelli @Vanlightly

@pradeepbn pradeepbn requested a review from eolivelli November 23, 2021 18:52
@pradeepbn
Copy link
Contributor Author

pradeepbn commented Nov 23, 2021

rerun failed checks

@pradeepbn
Copy link
Contributor Author

rerun failure checks

if (channel.isActive()) {
channel.writeAndFlush(response, channel.voidPromise());
} else {
LOGGER.info("Netty channel is inactive, hence bypassing netty channel writeAndFlush during sendResponse");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please report the channel(), it will bring useful information about the client that won't receive the response.
please use "debug" and not "info"

Copy link
Contributor Author

@pradeepbn pradeepbn Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Changed it. As an example, it prints like this: Netty channel [id: 0x8f9a1f37, L:/192.168.160.3:3181 ! R:/192.168.160.1:61752] is inactive, hence bypassing netty channel writeAndFlush during sendResponse

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Vanlightly Vanlightly merged commit 7395bb4 into apache:master Dec 1, 2021
zymap pushed a commit that referenced this pull request Jun 16, 2022
Reorders the sequence of the bookkeeper server shutdown
so that any in-progress reads or writes don't hit ledger
storage after it has been shutdown. Now the request processor
is shutdown before the bookie.

An additional check if the channel is active is performed in
the packet processor callbacks before sending response
to avoid RejectedExecutionException messages within
Netty from polluting the log.

(cherry picked from commit 7395bb4)
@zymap
Copy link
Member

zymap commented Jun 16, 2022

cherry-picked this to branch-4.14 to resolve the conflict

lhotari pushed a commit to datastax/bookkeeper that referenced this pull request Aug 9, 2022
Reorders the sequence of the bookkeeper server shutdown
so that any in-progress reads or writes don't hit ledger
storage after it has been shutdown. Now the request processor
is shutdown before the bookie.

An additional check if the channel is active is performed in
the packet processor callbacks before sending response
to avoid RejectedExecutionException messages within
Netty from polluting the log.

(cherry picked from commit 7395bb4)
(cherry picked from commit f8eb20d)
nicoloboschi pushed a commit to datastax/bookkeeper that referenced this pull request Jan 11, 2023
Reorders the sequence of the bookkeeper server shutdown
so that any in-progress reads or writes don't hit ledger
storage after it has been shutdown. Now the request processor
is shutdown before the bookie.

An additional check if the channel is active is performed in
the packet processor callbacks before sending response
to avoid RejectedExecutionException messages within
Netty from polluting the log.

(cherry picked from commit 7395bb4)
(cherry picked from commit f8eb20d)
Ghatage pushed a commit to sijie/bookkeeper that referenced this pull request Jul 12, 2024
Reorders the sequence of the bookkeeper server shutdown
so that any in-progress reads or writes don't hit ledger
storage after it has been shutdown. Now the request processor
is shutdown before the bookie.

An additional check if the channel is active is performed in
the packet processor callbacks before sending response 
to avoid RejectedExecutionException messages within
Netty from polluting the log.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants