Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

public class FutureUtils
Expand Down Expand Up @@ -205,4 +207,34 @@ public void onFailure(Throwable e)

return retVal;
}

/**
* Adds success and failure callbacks to the given future.
*/
public static <V> void addCallback(
ListenableFuture<V> future,
Executor executor,
Consumer<V> onSuccess,
Consumer<Throwable> onFailure
)
{
Futures.addCallback(
future,
new FutureCallback<V>()
{
@Override
public void onSuccess(@Nullable V result)
{
onSuccess.accept(result);
}

@Override
public void onFailure(Throwable t)
{
onFailure.accept(t);
}
},
executor
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,22 @@ private Stopwatch(com.google.common.base.Stopwatch delegate)
this.delegate = delegate;
}

public synchronized void start()
public synchronized Stopwatch start()
{
delegate.start();
return this;
}

public synchronized void stop()
public synchronized Stopwatch stop()
{
delegate.stop();
return this;
}

public synchronized void reset()
public synchronized Stopwatch reset()
{
delegate.reset();
return this;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,61 @@ public void test_futureWithBaggage_failure()
MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
Assert.assertEquals(1, baggageHandled.get());
}

@Test
public void test_addCallback_success()
{
final SettableFuture<Long> future = SettableFuture.create();

final AtomicLong observedValue = new AtomicLong(0);
final AtomicReference<Throwable> observedError = new AtomicReference<>();
FutureUtils.addCallback(
future,
Execs.directExecutor(),
observedValue::set,
observedError::set
);

future.set(101L);
Assert.assertEquals(101L, observedValue.get());
Assert.assertNull(observedError.get());
}

@Test
public void test_addCallback_failure()
{
final SettableFuture<String> future = SettableFuture.create();

final AtomicReference<String> observedValue = new AtomicReference<>();
final AtomicReference<Throwable> observedError = new AtomicReference<>();
FutureUtils.addCallback(
future,
Execs.directExecutor(),
observedValue::set,
observedError::set
);

future.setException(new ISE("an error occurred"));
Assert.assertNull(observedValue.get());
Assert.assertTrue(observedError.get() instanceof ISE);
}

@Test
public void test_addCallback_cancelled()
{
final SettableFuture<String> future = SettableFuture.create();

final AtomicReference<String> observedValue = new AtomicReference<>();
final AtomicReference<Throwable> observedError = new AtomicReference<>();
FutureUtils.addCallback(
future,
Execs.directExecutor(),
observedValue::set,
observedError::set
);

future.cancel(true);
Assert.assertNull(observedValue.get());
Assert.assertTrue(observedError.get() instanceof CancellationException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,22 @@ public void testHasElapsed()
Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(101)));
Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(500)));
}

@Test
public void testChainedMethods()
{
FakeTicker fakeTicker = new FakeTicker();
Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker);

fakeTicker.advance(100, TimeUnit.MILLISECONDS);
Assert.assertEquals(100, stopwatch.millisElapsed());

stopwatch.stop().start();
Assert.assertTrue(stopwatch.isRunning());
Assert.assertEquals(100, stopwatch.millisElapsed());

stopwatch.stop().reset();
Assert.assertFalse(stopwatch.isRunning());
Assert.assertEquals(0, stopwatch.millisElapsed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.joda.time.Interval;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -55,5 +58,13 @@ public interface OverlordClient

ListenableFuture<TaskPayloadResponse> taskPayload(String taskId);

ListenableFuture<List<TaskStatusPlus>> allActiveTasks();

ListenableFuture<Integer> totalWorkerCapacity();

ListenableFuture<Integer> totalWorkerCapacityWithAutoScale();

ListenableFuture<Map<String, List<Interval>>> lockedIntervals(Map<String, Integer> datasourceToMinTaskPriority);

OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
Expand All @@ -36,10 +40,14 @@
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Production implementation of {@link OverlordClient}.
Expand Down Expand Up @@ -148,12 +156,82 @@ public ListenableFuture<TaskPayloadResponse> taskPayload(String taskId)
);
}

@Override
public ListenableFuture<List<TaskStatusPlus>> allActiveTasks()
{
return FutureUtils.transform(
Futures.allAsList(
getTasksOfType("waitingTasks"),
getTasksOfType("pendingTasks"),
getTasksOfType("runningTasks")
),
listOfList -> listOfList.stream().flatMap(Collection::stream).collect(Collectors.toList())
);
}

@Override
public ListenableFuture<Integer> totalWorkerCapacity()
{
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/workers"),
new BytesFullResponseHandler()
),
holder ->
deserialize(holder, new TypeReference<Collection<IndexingWorkerInfo>>() {})
.stream()
.mapToInt(workerInfo -> workerInfo.getWorker().getCapacity())
.sum()
);
}

@Override
public ListenableFuture<Integer> totalWorkerCapacityWithAutoScale()
{
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity"),
new BytesFullResponseHandler()
),
holder ->
deserialize(holder, new TypeReference<IndexingTotalWorkerCapacityInfo>() {})
.getMaximumCapacityWithAutoScale()
);
}

@Override
public ListenableFuture<Map<String, List<Interval>>> lockedIntervals(
Map<String, Integer> datasourceToMinTaskPriority
)
{
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals")
.jsonContent(jsonMapper, datasourceToMinTaskPriority),
new BytesFullResponseHandler()
),
holder -> deserialize(holder, new TypeReference<Map<String, List<Interval>>>() {})
);
}

@Override
public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return new OverlordClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper);
}

private ListenableFuture<List<TaskStatusPlus>> getTasksOfType(String type)
{
final String path = StringUtils.format("/druid/indexer/v1/%s", StringUtils.urlEncode(type));
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> deserialize(holder, new TypeReference<List<TaskStatusPlus>>() {})
);
}

private <T> T deserialize(final BytesFullResponseHolder bytesHolder, final Class<T> clazz)
{
try {
Expand Down
Loading