diff --git a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java index 2fd6877a9656..811fe59ce017 100644 --- a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java +++ b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java @@ -35,7 +35,7 @@ public ThreadRenamingCallable( } @Override - public final T call() + public final T call() throws Exception { final Thread currThread = Thread.currentThread(); String currName = currThread.getName(); @@ -48,5 +48,5 @@ public final T call() } } - public abstract T doCall(); + public abstract T doCall() throws Exception; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 70c22187d8dd..3f0f0bac73b0 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -33,12 +33,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; @@ -48,10 +47,13 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.io.Closer; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -131,6 +133,8 @@ public class AppenderatorImpl implements Appenderator // and abandon threads do not step over each other private final Lock commitLock = new ReentrantLock(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService pushExecutor = null; // use intermediate executor so that deadlock conditions can be prevented @@ -140,7 +144,8 @@ public class AppenderatorImpl implements Appenderator private volatile long nextFlush; private volatile FileLock basePersistDirLock = null; private volatile FileChannel basePersistDirLockChannel = null; - private AtomicBoolean closed = new AtomicBoolean(false); + + private volatile Throwable persistError; public AppenderatorImpl( DataSchema schema, @@ -198,6 +203,13 @@ public Object startJob() return retVal; } + private void throwPersistErrorIfExists() + { + if (persistError != null) { + throw new RE(persistError, "Error while persisting"); + } + } + @Override public AppenderatorAddResult add( final SegmentIdentifier identifier, @@ -206,6 +218,8 @@ public AppenderatorAddResult add( final boolean allowIncrementalPersists ) throws IndexSizeExceededException, SegmentNotWritableException { + throwPersistErrorIfExists(); + if (!identifier.getDataSource().equals(schema.getDataSource())) { throw new IAE( "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", @@ -244,7 +258,23 @@ public AppenderatorAddResult add( || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { if (allowIncrementalPersists) { // persistAll clears rowsCurrentlyInMemory, no need to update it. - persistAll(committerSupplier == null ? null : committerSupplier.get()); + Futures.addCallback( + persistAll(committerSupplier == null ? null : committerSupplier.get()), + new FutureCallback() + { + @Override + public void onSuccess(@Nullable Object result) + { + // do nothing + } + + @Override + public void onFailure(Throwable t) + { + persistError = t; + } + } + ); } else { isPersistRequired = true; } @@ -340,6 +370,8 @@ public void clear() throws InterruptedException // Drop commit metadata, then abandon all segments. try { + throwPersistErrorIfExists(); + if (persistExecutor != null) { final ListenableFuture uncommitFuture = persistExecutor.submit( new Callable() @@ -373,7 +405,7 @@ public Object call() throws Exception } } catch (ExecutionException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -391,6 +423,8 @@ public ListenableFuture drop(final SegmentIdentifier identifier) @Override public ListenableFuture persistAll(@Nullable final Committer committer) { + throwPersistErrorIfExists(); + final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); int numPersistedRows = 0; @@ -427,7 +461,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) new ThreadRenamingCallable(threadName) { @Override - public Object doCall() + public Object doCall() throws IOException { try { for (Pair pair : indexesToPersist) { @@ -469,9 +503,9 @@ public Object doCall() // return null if committer is null return commitMetadata; } - catch (Exception e) { + catch (IOException e) { metrics.incrementFailedPersists(); - throw Throwables.propagate(e); + throw e; } finally { metrics.incrementNumPersists();