Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public ThreadRenamingCallable(
}

@Override
public final T call()
public final T call() throws Exception
{
final Thread currThread = Thread.currentThread();
String currName = currThread.getName();
Expand All @@ -48,5 +48,5 @@ public final T call()
}
}

public abstract T doCall();
public abstract T doCall() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
Expand All @@ -48,10 +47,13 @@
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -131,6 +133,8 @@ public class AppenderatorImpl implements Appenderator
// and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock();

private final AtomicBoolean closed = new AtomicBoolean(false);

private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
// use intermediate executor so that deadlock conditions can be prevented
Expand All @@ -140,7 +144,8 @@ public class AppenderatorImpl implements Appenderator
private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null;
private AtomicBoolean closed = new AtomicBoolean(false);

private volatile Throwable persistError;

public AppenderatorImpl(
DataSchema schema,
Expand Down Expand Up @@ -198,6 +203,13 @@ public Object startJob()
return retVal;
}

private void throwPersistErrorIfExists()
{
if (persistError != null) {
throw new RE(persistError, "Error while persisting");
}
}

@Override
public AppenderatorAddResult add(
final SegmentIdentifier identifier,
Expand All @@ -206,6 +218,8 @@ public AppenderatorAddResult add(
final boolean allowIncrementalPersists
) throws IndexSizeExceededException, SegmentNotWritableException
{
throwPersistErrorIfExists();

if (!identifier.getDataSource().equals(schema.getDataSource())) {
throw new IAE(
"Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!",
Expand Down Expand Up @@ -244,7 +258,23 @@ public AppenderatorAddResult add(
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
persistAll(committerSupplier == null ? null : committerSupplier.get());
Futures.addCallback(
persistAll(committerSupplier == null ? null : committerSupplier.get()),
new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object result)
{
// do nothing
}

@Override
public void onFailure(Throwable t)
{
persistError = t;
}
}
);
} else {
isPersistRequired = true;
}
Expand Down Expand Up @@ -340,6 +370,8 @@ public void clear() throws InterruptedException
// Drop commit metadata, then abandon all segments.

try {
throwPersistErrorIfExists();

if (persistExecutor != null) {
final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
new Callable<Object>()
Expand Down Expand Up @@ -373,7 +405,7 @@ public Object call() throws Exception
}
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -391,6 +423,8 @@ public ListenableFuture<?> drop(final SegmentIdentifier identifier)
@Override
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
throwPersistErrorIfExists();

final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
Expand Down Expand Up @@ -427,7 +461,7 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
new ThreadRenamingCallable<Object>(threadName)
{
@Override
public Object doCall()
public Object doCall() throws IOException
{
try {
for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) {
Expand Down Expand Up @@ -469,9 +503,9 @@ public Object doCall()
// return null if committer is null
return commitMetadata;
}
catch (Exception e) {
catch (IOException e) {
metrics.incrementFailedPersists();
throw Throwables.propagate(e);
throw e;
}
finally {
metrics.incrementNumPersists();
Expand Down