Skip to content

Add limit to query result buffering queue#4949

Closed
stevenchen3 wants to merge 3 commits intoapache:masterfrom
stevenchen3:feat-4865
Closed

Add limit to query result buffering queue#4949
stevenchen3 wants to merge 3 commits intoapache:masterfrom
stevenchen3:feat-4865

Conversation

@stevenchen3
Copy link
Copy Markdown

The current implementation of DirectDruidClient maintains an unlimited queue to buffer data returned by query nodes. When query huge amount of data from Druid via broker and the user client side is unable to consume the results as fast as Druid servers produce them, it will cause OutOfMemoryError on broker (see #4865 for more details).

In this PR, we propose to add a configurable limit (i.e., limit to total bytes to buffer at this queue to maxBufferSizeBytes, the default value is Long.MAX_VALUE) to this queue on a per query basis. Compared to maxScatterGatherBytes, it will wait until queryBufferingTimeout (the default value is 5 minutes) when the queue currently reaches its capacity rather than fail the query immediately. However, it still respects the behavior of maxScatterGatherBytes.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenchen3 thanks for the contribution! Is this fixing #4933? (It sounds like it is, from your description.)

Could you please fill out the project CLA at http://druid.io/community/cla.html?

return maxBufferSizeBytes - totalBufferedBytes.get();
}

private void checkBufferCapacity(long bytes, long timeoutMs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this method will block until buffer space becomes available. We should make sure it's not running in any threads where that would be problematic.

}
queue.put(new ChannelBufferInputStream(response.getContent()));

checkBufferCapacity(bytes, queryBufferingTimeout);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to be a busy-wait-loop in a netty thread (IIRC that is what handleResponse gets called in) so this could cause starvation of the netty thread pool.

Am I reading this right? If so is there some better way we can do backpressure with netty?

I'm not a netty expert so I'm not sure what the best way is to go here. Ideally we want netty to stop reading from the socket and stop delivering content to us, which would propagate to the historical nodes, which would eventually block up their http threads. But I don't know how to make that happen with netty.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm True. It might cause starvation and that's why I introduced queryBufferingTimeout. This is a workaround to fix the OOMs issue with the broker, I am not a netty expert neither, not sure what will be the best to do backpressure with netty.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you interested in exploring a way to do this without blocking the netty threads? That would really be an ideal solution I think.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like netty also supports some watermark buffers which can possibly be used to achive backpressure.
Atleast the javadocs here seems to indicate this -
http://netty.io/4.1/api/io/netty/channel/WriteBufferWaterMark.html
However this might need an upgrade to netty version used by druid.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm I would love to explore what I can do without blocking netty threads.

checkQueryTimeout();
checkTotalBytesLimit(response.getContent().readableBytes());

final long bytes = response.getContent().readableBytes();
Copy link
Copy Markdown
Contributor

@akashdw akashdw Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You can rename it totalBytes or something similar.

private void checkBufferCapacity(long bytes, long timeoutMs)
{
final long startTimeMs = System.currentTimeMillis();
boolean isTimeout = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Instead you can do something like:

 private void checkBufferCapacity(long bytes, long timeoutMs)
  {
    final long startTimeMs = System.currentTimeMillis();
    boolean isTimeout = false;
    while (maxBufferSizeBytes < Long.MAX_VALUE && getBufferCapacity() < bytes) {
      if (System.currentTimeMillis() - startTimeMs > timeoutMs) {
        String msg = StringUtils.format(
            "Query[%s] url[%s] max buffer limit reached: waiting for free buffer timeout",
            query.getId(),
            url
        );
        setupResponseReadFailure(msg, null);
        throw new RE(msg);     
      
      }
    }
      totalBufferedBytes.addAndGet(bytes);
  }

}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
private static <T> Query<T> withCustomizedArgument(Query<T> query, String key, long value)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming method name to withCustomizedLimit and argument value -> maxLimit

}
queue.put(new ChannelBufferInputStream(response.getContent()));

checkBufferCapacity(bytes, queryBufferingTimeout);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like netty also supports some watermark buffers which can possibly be used to achive backpressure.
Atleast the javadocs here seems to indicate this -
http://netty.io/4.1/api/io/netty/channel/WriteBufferWaterMark.html
However this might need an upgrade to netty version used by druid.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 19, 2018

It looks like #6014 is attempting to solve the same problem.

@drcrallen
Copy link
Copy Markdown
Contributor

I ended up hitting a LOT of problems when I tried to get #6014 into production that I wrote up in the pr.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 1, 2018

I think this will run into the same problem that @drcrallen ran into with #6014, due to blocking HttpClient worker threads. I am going to close this since I think we need a different approach: but, @stevenchen3 I hope we can continue the discussion in #6014 or #4933.

@gianm gianm closed this Aug 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants