Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions benchmarks/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
45 changes: 41 additions & 4 deletions core/src/main/java/org/apache/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,16 @@

public interface BlockingPool<T>
{
/**
* Returns the total pool size.
*/
int maxSize();

/**
* Poll all available resources from the pool. If there's no available resource, it returns an empty list.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BlockingQueue interface has a drainTo method that does something similar. Can you add documentation on how the behavior of this method is similar or different compared to drainTo?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why you think it should be documented? BlockingPool is our own API and it doesn't use BlockingQueue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think as a new developer. What would reduce the total spinup time to get effective contributions. There's a lot of methods which are kind of like core java methods, but are a little different, and it gets hard to differentiate what is important and what is not.

IMHO "This is a subset of BlockingQueue with some other added features, but we don't want to depend on keeping up with BlockingQueue upstream changes" is an easier story than "We made up our own interfaces that behave different than other things you've seen, and you have to learn about how the new interfaces are and are not enforced"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a subset of BlockingQueue. It's only similar in terms of "the blocking operation", but it doesn't mean they're similar. The queue and the pool are different. The key characteristic of the queue is that it's FIFO, but there's no order for inserting/getting items to/from the pool.

I don't think anyone would guess BlockingPool's behavior by comparing it with BlockingQueue. If something is not clear, it just means we need to add more detailed doc.

*/
List<ReferenceCountingResourceHolder<T>> pollAll();

/**
* Take a resource from the pool, waiting up to the
* specified wait time if necessary for an element to become available.
Expand All @@ -51,16 +59,45 @@ public interface BlockingPool<T>
* @param elementNum number of resources to take
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
* @return {@link TakeBatchResult} containing a list of resource holders if it succeeds. An empty list should be
* returned if there aren't enough available resources. The result also contains the number of remaining
* resources after this call no matter it succeeded or not.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs);
TakeBatchResult<T> takeBatch(int elementNum, long timeoutMs);

/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
* @return {@link TakeBatchResult} containing a list of resource holders and the number of remaining resources.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum);
TakeBatchResult<T> takeBatch(int elementNum);

class TakeBatchResult<T>
{
private final List<ReferenceCountingResourceHolder<T>> elements;
private final int numAvailableElements;

public TakeBatchResult(List<ReferenceCountingResourceHolder<T>> elements, int numAvailableElements)
{
this.elements = elements;
this.numAvailableElements = numAvailableElements;
}

public boolean isOk()
{
return elements.size() > 0;
}

public List<ReferenceCountingResourceHolder<T>> getElements()
{
return elements;
}

public int getNumAvailableElements()
{
return numAvailableElements;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,28 @@ public int maxSize()
}

@VisibleForTesting
public int getPoolSize()
public int available()
{
return objects.size();
}

@Override
public List<ReferenceCountingResourceHolder<T>> pollAll()
{
checkInitialized();
final ReentrantLock lock = this.lock;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment doing something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just following other codes.

lock.lock();
try {
final List<T> list = new ArrayList<>(objects.size());
list.addAll(objects);
objects.clear();
return list.stream().map(this::wrapObject).collect(Collectors.toList());
}
finally {
lock.unlock();
}
}

@Override
@Nullable
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
Expand Down Expand Up @@ -160,25 +177,30 @@ private T takeObject() throws InterruptedException
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum, final long timeoutMs)
public TakeBatchResult<T> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(elementNum > 0, "elementNum should be positive");
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
final List<T> objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum);
return objects.stream().map(this::wrapObject).collect(Collectors.toList());
return new TakeBatchResult<>(objects.stream().map(this::wrapObject).collect(Collectors.toList()), objects.size());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum)
public TakeBatchResult<T> takeBatch(final int elementNum)
{
Preconditions.checkArgument(elementNum > 0, "elementNum should be positive");
checkInitialized();
try {
return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList());
return new TakeBatchResult<>(
takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()),
objects.size()
);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public int maxSize()
return 0;
}

@Override
public List<ReferenceCountingResourceHolder<T>> pollAll()
{
throw new UnsupportedOperationException();
}

@Override
public ReferenceCountingResourceHolder<T> take(long timeoutMs)
{
Expand All @@ -57,13 +63,13 @@ public ReferenceCountingResourceHolder<T> take()
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs)
public TakeBatchResult<T> takeBatch(int elementNum, long timeoutMs)
{
throw new UnsupportedOperationException();
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum)
public TakeBatchResult<T> takeBatch(int elementNum)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading