Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
700d3a1
Move some sequence implementations to Stream in Caching Clustered Client
Jun 20, 2018
fe40bca
Fix up build
Jun 20, 2018
20659d2
Work on getting tests to pass
Jun 21, 2018
8f00fda
Don't use guava `Function
Jun 21, 2018
c673f51
Tests pass!
Jun 21, 2018
e86c690
General cleanup and refactoring for readability
Jun 21, 2018
8a1942a
Streams all around
Jun 21, 2018
4c9be21
PR comments
Jun 22, 2018
0eece4b
Add in parallel execution via FJP
Jun 26, 2018
f80a5bc
Move to completion service... but risk deadlocking
Jun 26, 2018
0bdaed2
Revert "Move to completion service... but risk deadlocking"
Jun 26, 2018
1a6b3b6
Actually make the stream parallel
Jun 26, 2018
e86227b
Make the returned merge sequence fetch intermediate results asap
Jun 27, 2018
ae6ae4b
Pass Throwables
Jun 27, 2018
888c42e
Code format
Jun 27, 2018
7377fc2
Eager materialize direct druid client sequences
Jun 27, 2018
40fce24
Fix functionality test
Jun 27, 2018
d711e41
Merge remote-tracking branch 'upstream/master' into cachingclient/str…
Jun 27, 2018
445de11
Remove non-germane changes
Jun 27, 2018
7b66378
LongAdder
Jul 13, 2018
974f3b2
Remove type arguments in test
Jul 13, 2018
7efd96e
Add HybridCache tests
Jul 13, 2018
6341e1c
Remove unused import
Jul 13, 2018
0a72781
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Jul 13, 2018
981a0d6
Fix copyright
Jul 13, 2018
a1d2863
Copyrights
Jul 13, 2018
b3916af
Annonymous fork join pool worker task
Jul 16, 2018
ae3d7ef
simple --> fromStream
Jul 16, 2018
19290ec
Simple identity rename
Jul 16, 2018
f44f9cf
Method ordering refactoring
Jul 16, 2018
6d90500
Refactoring a bit and better javadoc descriptions
Jul 16, 2018
066716e
Formatting
Jul 16, 2018
88444b5
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Jul 16, 2018
9ddc295
Add query context docs
Jul 17, 2018
1875786
Add FJP friendly methods
Jul 23, 2018
f5ecd7e
Change streams to use fjp for merge work
Jul 21, 2018
d014b0f
Fix missed merge conflict
Jul 25, 2018
e876160
Fix missed merge in test
Jul 25, 2018
8c9bc56
Use non-forbidden api
Jul 25, 2018
fa84ac7
Fix merge conflict
Jul 25, 2018
57dc8c7
Fix more missed merge conflicts
Jul 25, 2018
c4bc02c
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Jul 25, 2018
700d5af
Don't use forbidden apis again
Jul 25, 2018
4031bc0
More forbidden apis
Jul 25, 2018
a3fdf08
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Jul 30, 2018
88a0ac9
Add Execs.checkThreadNameFormat
Jul 30, 2018
0130742
Pretty deadline caluclating in ChainedExecutionQueryRunner
Jul 30, 2018
0a938a8
Add javadoc
Jul 30, 2018
34bddef
Refactor to split out functions some more
Jul 30, 2018
4c10bda
Use atomic long for state in fork join naming
Jul 30, 2018
fea19d2
UCASE constant
Aug 6, 2018
314bc28
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Aug 8, 2018
14228d1
Address code comments
Aug 8, 2018
f70b29b
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Aug 17, 2018
0ff889f
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Aug 27, 2018
3aba8b7
Add tests for deadline
Aug 27, 2018
2e85c6a
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Aug 29, 2018
498c909
Address code comments
Aug 29, 2018
4028a22
Add more examples
Aug 29, 2018
66f6af1
newline
Aug 29, 2018
b6ed828
Super awesome mega array preallocation
Aug 29, 2018
ecd465c
Add java docs explaining fork/join mechanism
Aug 29, 2018
1398913
Move some things to single line
Aug 29, 2018
0aca297
Code formatting
Aug 29, 2018
857f548
Fix code formatting
Aug 29, 2018
6ca6339
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Sep 5, 2018
70624fb
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Sep 21, 2018
5e89cb9
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Oct 4, 2018
9723043
Get rid of divide by zero
Oct 4, 2018
6996ec9
Start addressing comments
Oct 4, 2018
9b10334
Merge remote-tracking branch 'apache/master' into cachingclient/strea…
Oct 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ public SerializablePair(@JsonProperty("lhs") T1 lhs, @JsonProperty("rhs") T2 rhs
super(lhs, rhs);
}

@Override
@JsonProperty
public T1 getLhs()
{
return lhs;
return super.getLhs();
}

@Override
@JsonProperty
public T2 getRhs()
{
return rhs;
return super.getRhs();
}
}
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import javax.annotation.Nullable;
import java.util.List;
import java.util.stream.Stream;

/**
*/
Expand Down Expand Up @@ -62,4 +66,17 @@ public static <T extends Enum<T>> T getEnumIfPresent(final Class<T> enumClass, f

return null;
}

/**
* Materialze the stream of futures into a single listenable future that will return the list of results.
*
* @param futures The futures to collect into a single Listenable future
* @param <V> The return value for the futures
*
* @return A single ListenableFuture whose return value is a list of the completed values of the input stream.
*/
public static <V> ListenableFuture<List<V>> allFuturesAsList(Stream<ListenableFuture<? extends V>> futures)
{
return Futures.allAsList(futures::iterator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.TimeoutException;

/**
*/
Expand Down Expand Up @@ -138,4 +139,23 @@ public static DateTime maxDateTime(DateTime... times)
return max;
}
}

/**
* Return a qty of millisconds approximately until deadline. If deadline has passed, throw TimeoutException
*
* @param deadline The time on or after which things should be considered "timed out"
*
* @return A millisecond number where, if one were to wait that many milliseconds, the deadline would
* probably have passed. Always greater than zero
*
* @throws TimeoutException If the deadline has already passed (ties are treated as having passed the deadline)
*/
public static long timeoutForDeadline(DateTime deadline) throws TimeoutException
{
final DateTime now = DateTimes.nowUtc();
if (now.isAfter(deadline) || now.isEqual(deadline)) {
throw new TimeoutException(StringUtils.format("Deadline passed: [%s]", deadline));
}
return deadline.getMillis() - now.getMillis();
}
}
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/druid/java/util/common/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.druid.java.util.common;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
*/
Expand All @@ -32,6 +35,14 @@ public static <T1, T2> Pair<T1, T2> of(@Nullable T1 lhs, @Nullable T2 rhs)
return new Pair<>(lhs, rhs);
}

public static <T1, T2> Collector<Pair<T1, T2>, ?, Map<T1, T2>> mapCollector()
{
return Collectors.toMap(
Pair<T1, T2>::getLhs,
Pair<T1, T2>::getRhs
);
}

@Nullable
public final T1 lhs;

Expand All @@ -47,6 +58,18 @@ public Pair(
this.rhs = rhs;
}

@Nullable
public T1 getLhs()
{
return lhs;
}

@Nullable
public T2 getRhs()
{
return rhs;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,30 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
Expand Down Expand Up @@ -147,4 +157,113 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
}
);
}

private static final AtomicLong fjpWorkerThreadCount = new AtomicLong(0L);

public static ForkJoinWorkerThread makeWorkerThread(String name, ForkJoinPool pool)
{
final ForkJoinWorkerThread t = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
t.setDaemon(true);
final long threadNumber = fjpWorkerThreadCount.incrementAndGet();
t.setName(StringUtils.nonStrictFormat(name, threadNumber));
return t;
}

private static final int DUMMY_THREAD_NUMBER = 17;

/**
* Fail fast if the format can't take a single argument integer for a thread counter.
*
* Note that LACK of any argument in the format string still renders a valid name
*
* @param format The name format to check
*
* @throws java.util.IllegalFormatException if the format passed in does is not able to take a single thread parameter
*/
public static void checkThreadNameFormat(String format)
{
StringUtils.format(format, DUMMY_THREAD_NUMBER);
}

/**
* Get the result for the future (without timeout), but do so in a way safe for running in a ForkJoinPool
*
* @param future The future to block on completion
* @param <T> The type of the return value
*
* @return The result of the future if successfully completed, or one of the exceptions if not
*
* @throws InterruptedException If the call to future.get() was interrupted
* @throws ExecutionException If the future completed with an exception
*/
public static <T> T futureManagedBlockGet(final Future<? extends T> future)
throws InterruptedException, ExecutionException
{
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker()
{
@Override
public boolean block() throws InterruptedException
{
try {
future.get();
}
catch (ExecutionException e) {
// Ignore, will be caught when get is called below
}
return true;
}

@Override
public boolean isReleasable()
{
return future.isDone();
}
});
return future.get();
}

/**
* Attempt to get the result of the future before the deadline, but do so in a way safe to run in a ForkJoinPool.
* The deadline is best effort. It is possible the future completes, but the deadline is exceeded before the result
* can be returned. In such a scenario a TimeoutException will be thrown.
*
* The caller is responsible for handling the state of the Future in the case of an exception being thrown.
* Specifically, if an InterruptedException or a TimeoutException is thrown, there is no attempt in this method
* to change the behavior of the future. The caller should handle the potentially still active future as they see fit.
*
* @param future The future to await completion
* @param deadline Best effort deadline for the completion of the future.
* @param <T> The future's yielded type
*
* @return The yield of the future or else a thrown exception
*
* @throws InterruptedException If the call to future.get is interrupted
* @throws TimeoutException If the deadline is exceeded
* @throws ExecutionException If the future completed with an exception
*/
public static <T> T futureManagedBlockGet(final Future<? extends T> future, final DateTime deadline)
throws InterruptedException, TimeoutException, ExecutionException
{
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker()
{
@Override
public boolean block() throws InterruptedException
{
try {
future.get(JodaUtils.timeoutForDeadline(deadline), TimeUnit.MILLISECONDS);
}
catch (ExecutionException | TimeoutException e) {
// Will get caught later
}
return true;
}

@Override
public boolean isReleasable()
{
return future.isDone() || deadline.isBefore(DateTimes.nowUtc());
}
});
return future.get(JodaUtils.timeoutForDeadline(deadline), TimeUnit.MILLISECONDS);
}
}
Loading