Skip to content

Optionally refuse to consume new data until the prior chunk is being consumed#6014

Closed
drcrallen wants to merge 3 commits intoapache:masterfrom
drcrallen:client/broker/backpressure
Closed

Optionally refuse to consume new data until the prior chunk is being consumed#6014
drcrallen wants to merge 3 commits intoapache:masterfrom
drcrallen:client/broker/backpressure

Conversation

@drcrallen
Copy link
Copy Markdown
Contributor

@drcrallen drcrallen commented Jul 17, 2018

Fixes #4933

This PR adds in an optional boolean query context field called enableBrokerBackpressure. The impact of when this flag is set to true is to force the DirectDruidClient to stop collecting new data until this result group is actively being consumed.

In the prior implementation, parallel query results across the cluster will be collected in a LinkedBlockingQueue without bound. This means that ALL results are potentially accumulated as channel streams without the ability to limit the outstanding data that has not been consumed. This coupled with a highly single threaded folding operation (see #5913) on result merging means that it can be an unknown length of time until the results are even attempted to be consumed.

The intention of this change is that the result that is actively being folded into a sequence has one channel input stream that is being consumed to be folded, and one in transit in parallel. This is enforced by using the transfer method instead of the put method to add new data to the enumerator on the output side.

This feature is feature-flagged by a context field so should have no impact on anyone who does not enable this feature.

Work before merging:

  • Test on an actual cluster

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 18, 2018

Is this meant to be addressing #4933? What effect does enabling this option have on historicals -- I guess backpressure will extend all the way to them, potentially blocking their http threads?

@drcrallen
Copy link
Copy Markdown
Contributor Author

@gianm yes, the main risk is blocking the http threads of historicals. Since results are merged in http threads and not processing threads, other queries should be able to continue to do work, but it could block new queries from coming in.

@drcrallen
Copy link
Copy Markdown
Contributor Author

This seems to cause deadlocks. Right now it looks like the "parallel merging" doesn't let the FJP know that iterating through the results is a blocking operation, so the FJP can get clogged with TransferQueue poll operations that don't match with the http client transfer operations, causing http and fjp threads to deadlock each other.

@drcrallen
Copy link
Copy Markdown
Contributor Author

I'm looking to see if there's a fix

@drcrallen
Copy link
Copy Markdown
Contributor Author

drcrallen commented Jul 18, 2018

I'm a bit baffled what is blocking here. There are a few competing threads for locks:

"HttpClient-Netty-Worker-87" - Thread t@123
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <28fa59c3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
        at io.druid.client.DirectDruidClient$1.handleChunk(DirectDruidClient.java:335)
        at io.druid.java.util.http.client.NettyHttpClient$1.messageReceived(NettyHttpClient.java:225)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:135)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459)
        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:485)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - locked <576b7c74> (a java.util.concurrent.ThreadPoolExecutor$Worker)

for producing into the inputstream queue, and

"processing-fjp-3" - Thread t@437
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4e912efa> (a com.google.common.util.concurrent.AbstractFuture$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:285)
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
        at io.druid.client.DirectDruidClient$JsonParserIterator$1.block(DirectDruidClient.java:619)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
        at io.druid.client.DirectDruidClient$JsonParserIterator.init(DirectDruidClient.java:613)
        at io.druid.client.DirectDruidClient$JsonParserIterator.hasNext(DirectDruidClient.java:574)
        at io.druid.java.util.common.guava.BaseSequence.makeYielder(BaseSequence.java:87)
        at io.druid.java.util.common.guava.BaseSequence.toYielder(BaseSequence.java:67)
        at io.druid.java.util.common.guava.MappedSequence.toYielder(MappedSequence.java:49)
        at io.druid.java.util.common.guava.MergeSequence.lambda$toYielder$0(MergeSequence.java:56)
        at io.druid.java.util.common.guava.MergeSequence$$Lambda$177/186424324.accumulate(Unknown Source)
        at io.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:45)
        at io.druid.java.util.common.guava.MergeSequence.toYielder(MergeSequence.java:53)
        at io.druid.java.util.common.guava.YieldingSequenceBase.accumulate(YieldingSequenceBase.java:32)
        at io.druid.java.util.common.guava.Sequence.toList(Sequence.java:76)
        at io.druid.java.util.common.guava.MergeWorkTask.exec(MergeWorkTask.java:199)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
        - None

for initializing the results response.

Also

"processing-fjp-0" - Thread t@434
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b5e2920> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
        at io.druid.client.DirectDruidClient$1$2.nextElement(DirectDruidClient.java:303)
        at io.druid.client.DirectDruidClient$1$2.nextElement(DirectDruidClient.java:279)
        at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:110)
        at java.io.SequenceInputStream.read(SequenceInputStream.java:211)
        at com.fasterxml.jackson.dataformat.smile.SmileParser.loadMore(SmileParser.java:412)
        at com.fasterxml.jackson.dataformat.smile.SmileParser.nextToken(SmileParser.java:590)
        at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(UntypedObjectDeserializer.java:652)
        at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(UntypedObjectDeserializer.java:496)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:217)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:25)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromArray(BeanDeserializerBase.java:1229)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:157)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136)
        at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:463)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:378)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
        at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3708)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2052)
        at io.druid.client.DirectDruidClient$JsonParserIterator.next(DirectDruidClient.java:593)
        at io.druid.java.util.common.guava.BaseSequence.makeYielder(BaseSequence.java:88)
        at io.druid.java.util.common.guava.BaseSequence.toYielder(BaseSequence.java:67)
        at io.druid.java.util.common.guava.MappedSequence.toYielder(MappedSequence.java:49)
        at io.druid.java.util.common.guava.MergeSequence.lambda$toYielder$0(MergeSequence.java:56)
        at io.druid.java.util.common.guava.MergeSequence$$Lambda$177/186424324.accumulate(Unknown Source)
        at io.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:45)
        at io.druid.java.util.common.guava.MergeSequence.toYielder(MergeSequence.java:53)
        at io.druid.java.util.common.guava.YieldingSequenceBase.accumulate(YieldingSequenceBase.java:32)
        at io.druid.java.util.common.guava.Sequence.toList(Sequence.java:76)
        at io.druid.java.util.common.guava.MergeWorkTask.exec(MergeWorkTask.java:199)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
        - None

for moving data from the input streams into the merge sequences

Still looking to see where things are deadlocking

@drcrallen
Copy link
Copy Markdown
Contributor Author

as far as I can tell, the problem is that the Channel Future doesn't return from a get until the call is completed, but it can't complete because it is waiting for the queue to free up.

@drcrallen
Copy link
Copy Markdown
Contributor Author

I refactored things a bit and am going to try out a slightly modified direct druid client which only uses the HttpClient code to feed in InputStreams, rather than trying to wait for an entire result to succeed or fail.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 19, 2018

@drcrallen, a question: what happens when one query has a huge set of data to pull in, but the others don't? Will the entire broker start to block, or will the "nice" queries get to keep gathering data while the "not nice" one blocks?

@drcrallen
Copy link
Copy Markdown
Contributor Author

This ends up causing a lot of problems.

I wrestled with it for a long time but ended up unable to have a clean solution for the competing threads. I'm going to record what was found here but close down the PR:

There are two pools of HttpClients in the broker. One for the broker connections and one for a global client pool. The broker HttpClient worker threads hold connections to the historicals and call the callbacks in the handlers. It is unclear if one worker keeps one handler active all the time, or if there are internal "pipelines" or "channels" which are maintained and assigned to worker threads as results come back, I couldn't debug to that level of detail. It has a fixed thread count maximum.

There is the qtp pool for the http server which services the logic for handling the query itself (theSequence work). It has a fixed thread maximum.

With the parallel merge PR, there is also a ForkJoinPool for allowing forking and joining of merge work.

The challenge in managing these thread pools ended up hitting as follows:

Chunks would come back and be processed by broker HttpClient worker threads and attempt to call the callbacks of the DirectDruidClient handler. If the DirectDruidClient handler blocked, then the callbacks would not progress and the HttpClient thread would be stalled and risk pool starvation.

The jetty server http thread pool would be trying to feed off of a supplier of some kind so it could handle the Sequence materialization back up to the calling client.

The ForkJoinPool in the parallel merge PR would try to handle pushes from the HttpClient, do work, then have a way for the jetty server thread to feed the results out.

Trying to keep all the thread pools in lock-step to prevent thread starvation proved too much. I couldn't get tunings right or thread pool capacity predictions to a reasonable state. And the coordination started to look horrendous. For example, how can you make sure that HttpClient thread don't starve their connection limits to druid historicals for concurrent query loads? The extra frustrating part was that lots of unit tests and local integration tests worked great, but as soon as I would deploy the code to a real production system I would encounter locking problems and thread starvation instantly.

I think the "correct" solution is to move onto a framework that handles the scatter/gather nature of the druid query in a more start-to-finish kind of way rather than trying to piecemeal a bunch of disjoint components together.

The resource contention considerations are just really high without having a higher level cooperation among concurrent queries on how to handle thread pool resources.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 1, 2018

@drcrallen It's too bad it didn't work out!

If the DirectDruidClient handler blocked, then the callbacks would not progress and the HttpClient thread would be stalled and risk pool starvation.

I think the issue is that this really can't happen. IIRC, the handlers runs in a Netty async worker pool, and so if it "hogs" the pool for more than a split second, things can come to a screeching halt.

I believe that a workable approach to backpressure must involve cooperation from Netty in some way.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 1, 2018

Mucking with auto-read seems to be a viable approach: http://normanmaurer.me/presentations/2014-http-netty/slides.html#47.0, https://stackoverflow.com/questions/28273305/why-netty-4-proxy-example-has-to-set-channel-auto-read-as-false.

It might be time to move to a Netty 4.x based HTTP client: https://netty.io/wiki/new-and-noteworthy-in-4.0.html

3.x had an unintuitive inbound traffic suspension mechanism provided by Channel.setReadable(boolean). It introduced complicated interactions between ChannelHandlers and the handlers were easy to interfere with each other if implemented incorrectly.

In 4.0, a new outbound operation called read() has been added. If you turn off the default auto-read flag with Channel.config().setAutoRead(false), Netty will not read anything until you explicitly invoke the read() operation. Once the read() operation you issue is complete and the channel again stops reading, an inbound event called channelReadSuspended() will be triggered so that you can re-issue another read() operation. You can also intercept a read() operation to perform more advanced traffic control.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Sep 7, 2018

Well, I gave that "unintuitive inbound traffic suspension mechanism" in Netty 3.x a shot in #6313. It doesn't seem so bad, so I hope I implemented it correctly! It seems to work in early testing on a real cluster: it exerts backpressure on queries to historicals without blocking I/O threads on the broker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants