From 186aaa4f21c9315a03da3e7390ab2c7644e9be3b Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 14 Nov 2016 17:05:31 -0600 Subject: [PATCH 1/5] Removing unused code from io.druid.java.util.common.guava package; fix #3563 (more consistent and paranoiac resource handing in Sequences subsystem); Add Sequences.wrap() for DRY in MetricsEmittingQueryRunner, CPUTimeMetricQueryRunner and SpecificSegmentQueryRunner; Catch MissingSegmentsException in SpecificSegmentQueryRunner's yielder.next() method (follow up on #3617) --- .../common/guava/CombiningSequenceTest.java | 5 +- .../java/util/common/guava/BaseSequence.java | 63 +++---- .../util/common/guava/ConcatSequence.java | 23 +-- .../common/guava/ExecuteWhenDoneYielder.java | 5 +- .../guava/ExecutorExecutingSequence.java | 141 -------------- .../io/druid/java/util/common/guava/Fns.java | 52 ------ .../common/guava/IteratorWithBaggage.java | 67 ------- .../util/common/guava/LimitedSequence.java | 10 +- .../guava/LimitedYieldingAccumulator.java | 70 ------- .../common/guava/ResourceClosingSequence.java | 64 ------- .../common/guava/ResourceClosingYielder.java | 65 ------- .../util/common/guava/SequenceWrapper.java | 73 ++++++++ .../java/util/common/guava/Sequences.java | 44 ++++- .../util/common/guava/SimpleSequence.java | 56 ------ .../util/common/guava/WrappingSequence.java | 102 +++++++++++ .../util/common/guava/WrappingYielder.java | 103 +++++++++++ .../java/util/common/guava/YieldSign.java | 27 --- .../java/util/common/guava/Yielders.java | 15 +- .../util/common/guava/BaseSequenceTest.java | 4 +- .../guava/ExecutorExecutingSequenceTest.java | 173 ------------------ .../java/util/common/guava/TestSequence.java | 30 ++- .../common/guava/WithEffectSequenceTest.java | 75 ++++++++ ...nceTest.java => WrappingSequenceTest.java} | 43 ++++- .../druid/query/CPUTimeMetricQueryRunner.java | 75 +------- .../druid/query/GroupByMergedQueryRunner.java | 3 +- .../query/MetricsEmittingQueryRunner.java | 120 +++--------- .../io/druid/query/QueryRunnerHelper.java | 3 +- .../ReferenceCountingSegmentQueryRunner.java | 4 +- .../epinephelinae/GroupByQueryEngineV2.java | 3 +- .../groupby/strategy/GroupByStrategyV1.java | 5 +- .../spec/SpecificSegmentQueryRunner.java | 97 ++++------ .../segment/ReferenceCountingSequence.java | 51 ------ .../druid/client/CachingQueryRunnerTest.java | 36 ++-- 33 files changed, 604 insertions(+), 1103 deletions(-) delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/ExecutorExecutingSequence.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/Fns.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/IteratorWithBaggage.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingSequence.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingYielder.java create mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/SequenceWrapper.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/SimpleSequence.java create mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/WrappingSequence.java create mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/WrappingYielder.java delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/YieldSign.java delete mode 100644 java-util/src/test/java/io/druid/java/util/common/guava/ExecutorExecutingSequenceTest.java create mode 100644 java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java rename java-util/src/test/java/io/druid/java/util/common/guava/{ResourceClosingSequenceTest.java => WrappingSequenceTest.java} (55%) delete mode 100644 processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java diff --git a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java index 974d18da4ff9..040bac1739cb 100644 --- a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java +++ b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java @@ -24,15 +24,12 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; - import io.druid.java.util.common.Pair; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.guava.nary.BinaryFn; - import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -232,7 +229,7 @@ public void close() throws IOException Sequence> seq = Sequences.limit( new CombiningSequence<>( - new ResourceClosingSequence<>(Sequences.simple(pairs), closeable), + Sequences.withBaggage(Sequences.simple(pairs), closeable), Ordering.natural().onResultOf(Pair.lhsFn()), new BinaryFn, Pair, Pair>() { diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/BaseSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/BaseSequence.java index e4f49a1d45a0..e715a3d21908 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/BaseSequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/BaseSequence.java @@ -19,9 +19,6 @@ package io.druid.java.util.common.guava; -import com.google.common.base.Throwables; -import io.druid.java.util.common.logger.Logger; - import java.io.Closeable; import java.io.IOException; import java.util.Iterator; @@ -30,30 +27,9 @@ */ public class BaseSequence> implements Sequence { - private static final Logger log = new Logger(BaseSequence.class); private final IteratorMaker maker; - public static Sequence simple(final Iterable iterable) - { - return new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public Iterator make() - { - return iterable.iterator(); - } - - @Override - public void cleanup(Iterator iterFromMake) - { - - } - } - ); - } - public BaseSequence( IteratorMaker maker ) @@ -69,11 +45,18 @@ public OutType accumulate(OutType initValue, final Accumulator Yielder toYielder(OutType initValue, YieldingAccumulat try { return makeYielder(initValue, accumulator, iterator); } - catch (Exception e) { - // We caught an Exception instead of returning a really, real, live, real boy, errr, iterator - // So we better try to close our stuff, 'cause the exception is what is making it out of here. + catch (Throwable t) { try { maker.cleanup(iterator); } - catch (RuntimeException e1) { - log.error(e1, "Exception thrown when closing maker. Logging and ignoring."); + catch (Exception e) { + t.addSuppressed(e); } - throw Throwables.propagate(e); + throw t; } } @@ -138,16 +119,14 @@ public Yielder next(OutType initValue) try { return makeYielder(initValue, accumulator, iter); } - catch (Exception e) { - // We caught an Exception instead of returning a really, real, live, real boy, errr, iterator - // So we better try to close our stuff, 'cause the exception is what is making it out of here. + catch (Throwable t) { try { maker.cleanup(iter); } - catch (RuntimeException e1) { - log.error(e1, "Exception thrown when closing maker. Logging and ignoring."); + catch (Exception e) { + t.addSuppressed(e); } - throw Throwables.propagate(e); + throw t; } } @@ -165,10 +144,10 @@ public void close() throws IOException }; } - public static interface IteratorMaker> + public interface IteratorMaker> { - public IterType make(); + IterType make(); - public void cleanup(IterType iterFromMake); + void cleanup(IterType iterFromMake); } } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java index 2c8f461f0738..7479b7c0a952 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java @@ -21,6 +21,7 @@ import com.google.common.base.Throwables; +import java.io.Closeable; import java.io.IOException; /** @@ -73,11 +74,14 @@ public Sequence accumulate(Sequence accumulated, Sequence in) try { return makeYielder(yielderYielder, initValue, accumulator); } - catch (RuntimeException e) { - // We caught a RuntimeException instead of returning a really, real, live, real boy, errr, iterator - // So we better try to close our stuff, 'cause the exception is what is making it out of here. - CloseQuietly.close(yielderYielder); - throw e; + catch (Throwable t) { + try { + yielderYielder.close(); + } + catch (Exception e) { + t.addSuppressed(e); + } + throw t; } } @@ -87,10 +91,6 @@ public Yielder makeYielder( YieldingAccumulator accumulator ) { - if (yielderYielder.isDone()) { - return Yielders.done(initValue, yielderYielder); - } - while (!yielderYielder.isDone()) { Yielder yielder = yielderYielder.get().toYielder(initValue, accumulator); if (accumulator.yielded()) { @@ -152,8 +152,9 @@ public boolean isDone() @Override public void close() throws IOException { - yielder.close(); - yielderYielder.close(); + try (Closeable toClose = yielderYielder) { + yielder.close(); + } } }; } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java b/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java index 4b1adf8512d5..df4e7cfd92a5 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java @@ -56,9 +56,10 @@ public boolean isDone() @Override public void close() throws IOException { - if (isDone()) { + boolean done = isDone(); + baseYielder.close(); + if (done) { executor.execute(runnable); } - baseYielder.close(); } } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ExecutorExecutingSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/ExecutorExecutingSequence.java deleted file mode 100644 index d85d8f8a6f27..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ExecutorExecutingSequence.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import com.google.common.base.Throwables; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -/** - */ -public class ExecutorExecutingSequence implements Sequence -{ - private final Sequence sequence; - private final ExecutorService exec; - - public ExecutorExecutingSequence( - Sequence sequence, - ExecutorService exec - ) - { - this.sequence = sequence; - this.exec = exec; - } - - @Override - public OutType accumulate(final OutType initValue, final Accumulator accumulator) - { - Future future = exec.submit( - new Callable() - { - @Override - public OutType call() throws Exception - { - return sequence.accumulate(initValue, accumulator); - } - } - ); - try { - return future.get(); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - catch (ExecutionException e) { - throw Throwables.propagate(e); - } - } - - @Override - public Yielder toYielder(final OutType initValue, final YieldingAccumulator accumulator) - { - Future> future = exec.submit( - new Callable>() - { - @Override - public Yielder call() throws Exception - { - return makeYielder(sequence.toYielder(initValue, accumulator)); - } - } - ); - try { - return future.get(); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - catch (ExecutionException e) { - throw Throwables.propagate(e); - } - } - - private Yielder makeYielder(final Yielder yielder) - { - return new Yielder() - { - @Override - public OutType get() - { - return yielder.get(); - } - - @Override - public Yielder next(final OutType initValue) - { - Future> future = exec.submit( - new Callable>() - { - @Override - public Yielder call() throws Exception - { - return makeYielder(yielder.next(initValue)); - } - } - ); - try { - return future.get(); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - catch (ExecutionException e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean isDone() - { - return yielder.isDone(); - } - - @Override - public void close() throws IOException - { - yielder.close(); - } - }; - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Fns.java b/java-util/src/main/java/io/druid/java/util/common/guava/Fns.java deleted file mode 100644 index 9447d2f41c27..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Fns.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import com.google.common.base.Function; - -import java.util.Map; - -/** - */ -public class Fns -{ - public static Function splitFn(final String splitChar, final int numCols) - { - return new Function() - { - public String[] apply(String input) - { - return input.split(splitChar, numCols); - } - }; - } - - public static Function, OutType> getFromMap(final KeyType key) - { - return new Function, OutType>() - { - @Override - public OutType apply(Map in) - { - return in.get(key); - } - }; - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/IteratorWithBaggage.java b/java-util/src/main/java/io/druid/java/util/common/guava/IteratorWithBaggage.java deleted file mode 100644 index 061ed23c70cc..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/IteratorWithBaggage.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import io.druid.java.util.common.parsers.CloseableIterator; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -/** - */ -public class IteratorWithBaggage implements CloseableIterator -{ - private final Iterator baseIter; - private final Closeable baggage; - - public IteratorWithBaggage( - Iterator baseIter, - Closeable baggage - ) - { - this.baseIter = baseIter; - this.baggage = baggage; - } - - @Override - public boolean hasNext() - { - return baseIter.hasNext(); - } - - @Override - public T next() - { - return baseIter.next(); - } - - @Override - public void remove() - { - baseIter.remove(); - } - - @Override - public void close() throws IOException - { - baggage.close(); - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/LimitedSequence.java index bf412259b647..5e7f005a4b80 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedSequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/LimitedSequence.java @@ -27,12 +27,12 @@ * Limits the number of inputs from this sequence. For example, if there are actually 100 things in the sequence * but the limit is set to 10, the Sequence will act as if it only had 10 things. */ -public class LimitedSequence extends YieldingSequenceBase +final class LimitedSequence extends YieldingSequenceBase { private final Sequence baseSequence; private final int limit; - public LimitedSequence( + LimitedSequence( Sequence baseSequence, int limit ) @@ -59,7 +59,7 @@ private class LimitedYielder implements Yielder private final Yielder subYielder; private final LimitedYieldingAccumulator limitedAccumulator; - public LimitedYielder( + LimitedYielder( Yielder subYielder, LimitedYieldingAccumulator limitedAccumulator ) @@ -110,7 +110,7 @@ private class LimitedYieldingAccumulator extends DelegatingYieldingA int count; boolean interruptYield = false; - public LimitedYieldingAccumulator(YieldingAccumulator accumulator) + LimitedYieldingAccumulator(YieldingAccumulator accumulator) { super(accumulator); count = 0; @@ -138,7 +138,7 @@ public OutType accumulate(OutType accumulated, T in) return retVal; } - public boolean isInterruptYield() + boolean isInterruptYield() { return interruptYield; } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java b/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java deleted file mode 100644 index 72ff3b31fa22..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -/** - * @deprecated this class uses expensive volatile counter inside, but it is not thread-safe. It is going to be removed - * in the future. - */ - -@Deprecated -public class LimitedYieldingAccumulator extends YieldingAccumulator -{ - private final int limit; - private final YieldingAccumulator delegate; - - private volatile int count = 0; - - public LimitedYieldingAccumulator( - YieldingAccumulator delegate, int limit - ) - { - this.limit = limit; - this.delegate = delegate; - } - - @Override - public void yield() - { - delegate.yield(); - } - - @Override - public boolean yielded() - { - return delegate.yielded(); - } - - @Override - public void reset() - { - delegate.reset(); - } - - @Override - public OutType accumulate(OutType accumulated, T in) - { - if (count < limit) { - count++; - return delegate.accumulate(accumulated, in); - } - return accumulated; - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingSequence.java deleted file mode 100644 index 8c9ff8070f74..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingSequence.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import java.io.Closeable; - -/** - */ -public class ResourceClosingSequence implements Sequence -{ - private final Sequence baseSequence; - private final Closeable closeable; - - public ResourceClosingSequence(Sequence baseSequence, Closeable closeable) - { - this.baseSequence = baseSequence; - this.closeable = closeable; - } - - @Override - public OutType accumulate(OutType initValue, Accumulator accumulator) - { - try { - return baseSequence.accumulate(initValue, accumulator); - } - finally { - CloseQuietly.close(closeable); - } - } - - @Override - public Yielder toYielder( - OutType initValue, YieldingAccumulator accumulator - ) - { - final Yielder baseYielder; - try { - baseYielder = baseSequence.toYielder(initValue, accumulator); - } - catch (RuntimeException e) { - CloseQuietly.close(closeable); - throw e; - } - - return new ResourceClosingYielder<>(baseYielder, closeable); - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingYielder.java b/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingYielder.java deleted file mode 100644 index a5991e2bf32a..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ResourceClosingYielder.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import java.io.Closeable; -import java.io.IOException; - - -/** - */ -public class ResourceClosingYielder implements Yielder -{ - private final Yielder baseYielder; - private final Closeable closeable; - - public ResourceClosingYielder(Yielder baseYielder, Closeable closeable) - { - this.baseYielder = baseYielder; - this.closeable = closeable; - } - - @Override - public OutType get() - { - return baseYielder.get(); - } - - @Override - public Yielder next(OutType initValue) - { - return new ResourceClosingYielder<>(baseYielder.next(initValue), closeable); - } - - @Override - public boolean isDone() - { - return baseYielder.isDone(); - } - - @Override - public void close() throws IOException - { - if (closeable != null) { - closeable.close(); - } - baseYielder.close(); - } -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/SequenceWrapper.java b/java-util/src/main/java/io/druid/java/util/common/guava/SequenceWrapper.java new file mode 100644 index 000000000000..fb9b5c014f47 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/guava/SequenceWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.guava; + +import com.google.common.base.Supplier; + +/** + * @see Sequences#wrap(Sequence, SequenceWrapper) + */ +public abstract class SequenceWrapper +{ + /** + * Executed before sequence processing, i. e. before {@link Sequence#accumulate} or {@link Sequence#toYielder} on the + * wrapped sequence. Default implementation does nothing. + */ + public void before() + { + // do nothing + } + + /** + * Wraps any bits of the wrapped sequence processing: {@link Sequence#accumulate} or {@link Sequence#toYielder} and + * {@link Yielder#next(Object)} on the wrapped yielder. Doesn't wrap {@link #before} and {@link #after}. + * + *

{@code sequenceProcessing.get()} must be called just once. Implementation of this method should look like + *

+   * ... do something
+   * try {
+   *   return sequenceProcessing.get();
+   * }
+   * finally {
+   *   ... do something else
+   * }
+   * 
+ */ + public RetType wrap(Supplier sequenceProcessing) throws Exception + { + return sequenceProcessing.get(); + } + + /** + * Executed after sequence processing, i. e. after {@link Sequence#accumulate} on the wrapped sequence or after {@link + * Yielder#close()} on the wrapped yielder, or if exception was thrown from any method called on the wrapped sequence + * or yielder, or from {@link #before}, or from {@link #wrap} methods of this SequenceWrapper. + * + *

Even if {@code thrown} is not null, implementation of this method shouldn't rethrow it, it is done outside. + * + * @param isDone true if all elements in the sequence were processed and no exception was thrown, false otherwise + * @param thrown an exception thrown from any method called on the wrapped sequence or yielder, or from {@link + * #before()}, or from {@link #wrap} methods of this SequenceWrapper, or null if no exception was thrown. + */ + public void after(boolean isDone, Throwable thrown) throws Exception + { + // do nothing + } +} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java index 942b51d2ef4c..c7156e3e105b 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java @@ -20,6 +20,7 @@ package io.druid.java.util.common.guava; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Lists; @@ -27,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Executor; @@ -39,7 +41,22 @@ public class Sequences public static Sequence simple(final Iterable iterable) { - return BaseSequence.simple(iterable); + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return iterable.iterator(); + } + + @Override + public void cleanup(Iterator iterFromMake) + { + + } + } + ); } @SuppressWarnings("unchecked") @@ -78,9 +95,28 @@ public static Sequence limit(final Sequence sequence, final int limit) return new LimitedSequence<>(sequence, limit); } - public static Sequence withBaggage(final Sequence seq, Closeable baggage) + public static Sequence withBaggage(final Sequence seq, final Closeable baggage) { - return new ResourceClosingSequence<>(seq, baggage); + Preconditions.checkNotNull(baggage, "baggage"); + return wrap(seq, new SequenceWrapper() + { + @Override + public void after(boolean isDone, Throwable thrown) throws Exception + { + baggage.close(); + } + }); + } + + /** + * Allows to execute something before, after or around the processing of the given sequence. See documentation to + * {@link SequenceWrapper} methods for some details. + */ + public static Sequence wrap(Sequence seq, SequenceWrapper wrapper) + { + Preconditions.checkNotNull(seq, "seq"); + Preconditions.checkNotNull(wrapper, "wrapper"); + return new WrappingSequence<>(seq, wrapper); } public static Sequence withEffect(final Sequence seq, final Runnable effect, final Executor exec) @@ -108,7 +144,7 @@ public static Sequence sort(final Sequence sequence, final Comparator< { List seqList = Sequences.toList(sequence, Lists.newArrayList()); Collections.sort(seqList, comparator); - return BaseSequence.simple(seqList); + return simple(seqList); } public static > ListType toList(Sequence seq, ListType list) diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/SimpleSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/SimpleSequence.java deleted file mode 100644 index a3aa801a9d39..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/SimpleSequence.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import java.util.Iterator; - - -/** - */ -public class SimpleSequence extends BaseSequence> -{ - public static Sequence create(Iterable iterable) - { - return new SimpleSequence<>(iterable); - } - - public SimpleSequence( - final Iterable iterable - ) - { - super( - new IteratorMaker>() - { - @Override - public Iterator make() - { - return iterable.iterator(); - } - - @Override - public void cleanup(Iterator iterFromMake) - { - - } - } - ); - } - -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/WrappingSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/WrappingSequence.java new file mode 100644 index 000000000000..a9276acaa4c5 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/guava/WrappingSequence.java @@ -0,0 +1,102 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.guava; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; + +/** + */ +final class WrappingSequence implements Sequence +{ + private final Sequence baseSequence; + private final SequenceWrapper wrapper; + + WrappingSequence(Sequence baseSequence, SequenceWrapper wrapper) + { + this.baseSequence = Preconditions.checkNotNull(baseSequence, "baseSequence"); + this.wrapper = Preconditions.checkNotNull(wrapper, "wrapper"); + } + + @Override + public OutType accumulate(final OutType outType, final Accumulator accumulator) + { + OutType result; + try { + wrapper.before(); + result = wrapper.wrap(new Supplier() + { + @Override + public OutType get() + { + return baseSequence.accumulate(outType, accumulator); + } + }); + } + catch (Throwable t) { + // Close on failure + try { + wrapper.after(false, t); + } + catch (Exception e) { + t.addSuppressed(e); + } + throw Throwables.propagate(t); + } + // "Normal" close + try { + wrapper.after(true, null); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return result; + } + + @Override + public Yielder toYielder( + final OutType initValue, + final YieldingAccumulator accumulator + ) + { + try { + wrapper.before(); + return wrapper.wrap(new Supplier>() + { + @Override + public Yielder get() + { + return new WrappingYielder<>(baseSequence.toYielder(initValue, accumulator), wrapper); + } + }); + } + catch (Throwable t) { + // Close on failure + try { + wrapper.after(false, t); + } + catch (Exception e) { + t.addSuppressed(e); + } + throw Throwables.propagate(t); + } + } +} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/WrappingYielder.java b/java-util/src/main/java/io/druid/java/util/common/guava/WrappingYielder.java new file mode 100644 index 000000000000..93015e865e5d --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/guava/WrappingYielder.java @@ -0,0 +1,103 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.guava; + +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; + +import java.io.IOException; + +final class WrappingYielder implements Yielder +{ + private final Yielder baseYielder; + private final SequenceWrapper wrapper; + + WrappingYielder(Yielder baseYielder, SequenceWrapper wrapper) + { + this.baseYielder = baseYielder; + this.wrapper = wrapper; + } + + @Override + public OutType get() + { + return baseYielder.get(); + } + + @Override + public Yielder next(final OutType initValue) + { + try { + return wrapper.wrap(new Supplier>() + { + @Override + public Yielder get() + { + return new WrappingYielder<>(baseYielder.next(initValue), wrapper); + } + }); + } + catch (Throwable t) { + // Close on failure + try { + wrapper.after(false, t); + } + catch (Exception e) { + t.addSuppressed(e); + } + throw Throwables.propagate(t); + } + } + + @Override + public boolean isDone() + { + return baseYielder.isDone(); + } + + @Override + public void close() throws IOException + { + boolean isDone; + try { + isDone = isDone(); + baseYielder.close(); + } + catch (Throwable t) { + // Close on failure + try { + wrapper.after(false, t); + } + catch (Exception e) { + t.addSuppressed(e); + } + Throwables.propagateIfInstanceOf(t, IOException.class); + throw Throwables.propagate(t); + } + // "Normal" close + try { + wrapper.after(isDone, null); + } + catch (Exception e) { + Throwables.propagateIfInstanceOf(e, IOException.class); + throw Throwables.propagate(e); + } + } +} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/YieldSign.java b/java-util/src/main/java/io/druid/java/util/common/guava/YieldSign.java deleted file mode 100644 index d40b70652aa0..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/YieldSign.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -/** - */ -public interface YieldSign -{ - public T yield(T toYield); -} diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Yielders.java b/java-util/src/main/java/io/druid/java/util/common/guava/Yielders.java index 31f30c5037d4..0ccfa64370af 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Yielders.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Yielders.java @@ -19,16 +19,15 @@ package io.druid.java.util.common.guava; -import com.google.common.io.Closeables; +import com.google.common.base.Throwables; -import java.io.Closeable; import java.io.IOException; /** */ public class Yielders { - public static Yielder done(final T finalVal, final Closeable closeable) + public static Yielder done(final T finalVal, final AutoCloseable closeable) { return new Yielder() { @@ -53,7 +52,15 @@ public boolean isDone() @Override public void close() throws IOException { - Closeables.close(closeable, false); + if (closeable != null) { + try { + closeable.close(); + } + catch (Exception e) { + Throwables.propagateIfInstanceOf(e, IOException.class); + throw Throwables.propagate(e); + } + } } }; } diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/BaseSequenceTest.java b/java-util/src/test/java/io/druid/java/util/common/guava/BaseSequenceTest.java index fbf06db6adda..6434657cefb8 100644 --- a/java-util/src/test/java/io/druid/java/util/common/guava/BaseSequenceTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/guava/BaseSequenceTest.java @@ -34,14 +34,14 @@ public class BaseSequenceTest public void testSanity() throws Exception { final List vals = Arrays.asList(1, 2, 3, 4, 5); - SequenceTestHelper.testAll(BaseSequence.simple(vals), vals); + SequenceTestHelper.testAll(Sequences.simple(vals), vals); } @Test public void testNothing() throws Exception { final List vals = Arrays.asList(); - SequenceTestHelper.testAll(BaseSequence.simple(vals), vals); + SequenceTestHelper.testAll(Sequences.simple(vals), vals); } @Test diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/ExecutorExecutingSequenceTest.java b/java-util/src/test/java/io/druid/java/util/common/guava/ExecutorExecutingSequenceTest.java deleted file mode 100644 index 18382d43790c..000000000000 --- a/java-util/src/test/java/io/druid/java/util/common/guava/ExecutorExecutingSequenceTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.Futures; -import junit.framework.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - */ -public class ExecutorExecutingSequenceTest -{ - @Test - public void testSanity() throws Exception - { - TestExecutor exec = new TestExecutor(); - final List vals = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13); - ExecutorExecutingSequence seq = new ExecutorExecutingSequence<>(Sequences.simple(vals), exec); - - SequenceTestHelper.testAccumulation("", seq, vals); - Assert.assertEquals(1, exec.getTimesCalled()); - - exec.reset(); - - SequenceTestHelper.testYield("", 3, seq, vals); - Assert.assertEquals(5, exec.getTimesCalled()); - } - - @Test - public void testSanity2() throws Exception - { - TestExecutor exec = new TestExecutor(); - final List vals = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15); - ExecutorExecutingSequence seq = new ExecutorExecutingSequence<>(Sequences.simple(vals), exec); - - SequenceTestHelper.testAccumulation("", seq, vals); - Assert.assertEquals(1, exec.getTimesCalled()); - - exec.reset(); - - SequenceTestHelper.testYield("", 3, seq, vals); - Assert.assertEquals(6, exec.getTimesCalled()); - } - - public static class TestExecutor implements ExecutorService - { - int timesCalled = 0; - - public int getTimesCalled() - { - return timesCalled; - } - - public void reset() - { - timesCalled = 0; - } - - @Override - public void shutdown() - { - throw new UnsupportedOperationException(); - } - - @Override - public List shutdownNow() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isShutdown() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isTerminated() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - throw new UnsupportedOperationException(); - } - - @Override - public Future submit(Callable task) - { - ++timesCalled; - try { - return Futures.immediateCheckedFuture(task.call()); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public Future submit(Runnable task, T result) - { - throw new UnsupportedOperationException(); - } - - @Override - public Future submit(Runnable task) - { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException - { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException - { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException - { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException - { - throw new UnsupportedOperationException(); - } - - @Override - public void execute(Runnable command) - { - throw new UnsupportedOperationException(); - } - } -} diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/TestSequence.java b/java-util/src/test/java/io/druid/java/util/common/guava/TestSequence.java index b77a750a98ae..950b5046925e 100644 --- a/java-util/src/test/java/io/druid/java/util/common/guava/TestSequence.java +++ b/java-util/src/test/java/io/druid/java/util/common/guava/TestSequence.java @@ -19,12 +19,12 @@ package io.druid.java.util.common.guava; +import java.io.Closeable; import java.util.Arrays; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** -*/ + */ public class TestSequence implements Sequence { public static TestSequence create(Iterable iterable) @@ -42,21 +42,17 @@ public static TestSequence create(T... vals) public TestSequence(final Iterable iterable) { - base = new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public Iterator make() - { - return iterable.iterator(); - } - - @Override - public void cleanup(Iterator iterFromMake) - { - closed.set(true); - } - }); + base = Sequences.withBaggage( + Sequences.simple(iterable), + new Closeable() + { + @Override + public void close() + { + closed.set(true); + } + } + ); } @Override diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java b/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java new file mode 100644 index 000000000000..81efbf8e4733 --- /dev/null +++ b/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.guava; + +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +public class WithEffectSequenceTest +{ + @Test + public void testConsistentEffectApplicationOrder() + { + final AtomicInteger effect1 = new AtomicInteger(); + final AtomicInteger effect2 = new AtomicInteger(); + final AtomicInteger counter = new AtomicInteger(); + + Sequence sequence = Sequences.withEffect( + Sequences.withEffect( + Sequences.simple(Arrays.asList(1, 2, 3)), + new Runnable() + { + @Override + public void run() + { + effect1.set(counter.incrementAndGet()); + } + }, + MoreExecutors.sameThreadExecutor() + ), + new Runnable() + { + @Override + public void run() + { + effect2.set(counter.incrementAndGet()); + } + }, + MoreExecutors.sameThreadExecutor() + ); + // Run sequence via accumulate + Sequences.toList(sequence, new ArrayList()); + Assert.assertEquals(1, effect1.get()); + Assert.assertEquals(2, effect2.get()); + + // Ensure sequence runs via Yielder, because LimitedSequence extends YieldingSequenceBase + // "Limiting" a sequence of 3 elements with 4 to let effects be executed. If e. g. limit with 1 or 2, effects are + // not executed. + Sequence yieldingSequence = Sequences.limit(sequence, 4); + Sequences.toList(yieldingSequence, new ArrayList()); + Assert.assertEquals(3, effect1.get()); + Assert.assertEquals(4, effect2.get()); + } +} diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/ResourceClosingSequenceTest.java b/java-util/src/test/java/io/druid/java/util/common/guava/WrappingSequenceTest.java similarity index 55% rename from java-util/src/test/java/io/druid/java/util/common/guava/ResourceClosingSequenceTest.java rename to java-util/src/test/java/io/druid/java/util/common/guava/WrappingSequenceTest.java index c535881f8bbe..d6bec5d28ba4 100644 --- a/java-util/src/test/java/io/druid/java/util/common/guava/ResourceClosingSequenceTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/guava/WrappingSequenceTest.java @@ -24,13 +24,14 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class ResourceClosingSequenceTest +public class WrappingSequenceTest { @Test public void testSanity() throws Exception @@ -54,4 +55,44 @@ public void close() throws IOException closedCounter.set(0); SequenceTestHelper.testClosed(closedCounter, Sequences.withBaggage(new UnsupportedSequence(), closeable)); } + + @Test + public void testConsistentCloseOrder() + { + final AtomicInteger closed1 = new AtomicInteger(); + final AtomicInteger closed2 = new AtomicInteger(); + final AtomicInteger counter = new AtomicInteger(); + + Sequence sequence = Sequences.withBaggage( + Sequences.withBaggage( + Sequences.simple(Arrays.asList(1, 2, 3)), + new Closeable() + { + @Override + public void close() throws IOException + { + closed1.set(counter.incrementAndGet()); + } + } + ), + new Closeable() + { + @Override + public void close() throws IOException + { + closed2.set(counter.incrementAndGet()); + } + } + ); + // Run sequence via accumulate + Sequences.toList(sequence, new ArrayList()); + Assert.assertEquals(1, closed1.get()); + Assert.assertEquals(2, closed2.get()); + + // Ensure sequence runs via Yielder, because LimitedSequence extends YieldingSequenceBase + Sequence yieldingSequence = Sequences.limit(sequence, 1); + Sequences.toList(yieldingSequence, new ArrayList()); + Assert.assertEquals(3, closed1.get()); + Assert.assertEquals(4, closed2.get()); + } } diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index a2366365641e..805c9b2e25ea 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -21,19 +21,15 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; - +import com.google.common.base.Supplier; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.common.utils.VMUtils; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; import io.druid.java.util.common.guava.Sequences; -import io.druid.java.util.common.guava.Yielder; -import io.druid.java.util.common.guava.YieldingAccumulator; -import java.io.Closeable; -import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -70,76 +66,23 @@ public Sequence run( ) { final Sequence baseSequence = delegate.run(query, responseContext); - return Sequences.withBaggage( - new Sequence() + return Sequences.wrap( + baseSequence, + new SequenceWrapper() { @Override - public OutType accumulate(OutType initValue, Accumulator accumulator) + public RetType wrap(Supplier sequenceProcessing) { final long start = VMUtils.getCurrentThreadCpuTime(); try { - return baseSequence.accumulate(initValue, accumulator); - } - finally { + return sequenceProcessing.get(); + } finally { cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); } } @Override - public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) - { - final long start = VMUtils.getCurrentThreadCpuTime(); - final Yielder delegateYielder; - try { - delegateYielder = baseSequence.toYielder(initValue, accumulator); - } - finally { - cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); - } - return new Yielder() - { - @Override - public OutType get() - { - final long start = VMUtils.getCurrentThreadCpuTime(); - try { - return delegateYielder.get(); - } - finally { - cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); - } - } - - @Override - public Yielder next(OutType initValue) - { - final long start = VMUtils.getCurrentThreadCpuTime(); - try { - return delegateYielder.next(initValue); - } - finally { - cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); - } - } - - @Override - public boolean isDone() - { - return delegateYielder.isDone(); - } - - @Override - public void close() throws IOException - { - delegateYielder.close(); - } - }; - } - }, - new Closeable() - { - @Override - public void close() + public void after(boolean isDone, Throwable thrown) throws Exception { if (report) { final long cpuTime = cpuTimeAccumulator.get(); diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index f1e710193751..a943d583521e 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -34,7 +34,6 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Accumulator; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; @@ -153,7 +152,7 @@ public Void call() throws Exception return Sequences.simple(bySegmentAccumulatorPair.lhs); } - return new ResourceClosingSequence( + return Sequences.withBaggage( Sequences.simple( Iterables.transform( indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()), diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index a7558bbfa413..361375bf8ef7 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -20,14 +20,14 @@ package io.druid.query; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.java.util.common.guava.Accumulator; +import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.Sequence; -import io.druid.java.util.common.guava.Yielder; -import io.druid.java.util.common.guava.YieldingAccumulator; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; -import java.io.IOException; import java.util.Map; /** @@ -90,103 +90,32 @@ public Sequence run(final Query query, final Map responseC builder.setDimension(userDimension.getKey(), userDimension.getValue()); } - return new Sequence() - { - @Override - public OutType accumulate(OutType outType, Accumulator accumulator) - { - OutType retVal; - - long startTime = System.currentTimeMillis(); - try { - retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator); - } - catch (RuntimeException e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } - catch (Error e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } - finally { - long timeTaken = System.currentTimeMillis() - startTime; - - emitter.emit(builder.build(metricName, timeTaken)); - - if (creationTime > 0) { - emitter.emit(builder.build("query/wait/time", startTime - creationTime)); - } - } - - return retVal; - } - - @Override - public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) - { - Yielder retVal; - - long startTime = System.currentTimeMillis(); - try { - retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator); - } - catch (RuntimeException e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } - catch (Error e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } - - return makeYielder(startTime, retVal, builder); - } - - private Yielder makeYielder( - final long startTime, - final Yielder yielder, - final ServiceMetricEvent.Builder builder - ) - { - return new Yielder() + return Sequences.wrap( + // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying + // Sequence) as part of the reported query time, i. e. we want to execute queryRunner.run() after + // `startTime = System.currentTimeMillis();` (see below). + new LazySequence<>(new Supplier>() { @Override - public OutType get() + public Sequence get() { - return yielder.get(); - } - - @Override - public Yielder next(OutType initValue) - { - try { - return makeYielder(startTime, yielder.next(initValue), builder); - } - catch (RuntimeException e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } - catch (Error e) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - throw e; - } + return queryRunner.run(query, responseContext); } + }), + new SequenceWrapper() + { + private long startTime; @Override - public boolean isDone() + public void before() { - return yielder.isDone(); + startTime = System.currentTimeMillis(); } @Override - public void close() throws IOException + public void after(boolean isDone, Throwable thrown) { try { - if (!isDone() && builder.getDimension(DruidMetrics.STATUS) == null) { - builder.setDimension(DruidMetrics.STATUS, "short"); - } - long timeTaken = System.currentTimeMillis() - startTime; emitter.emit(builder.build(metricName, timeTaken)); @@ -194,12 +123,17 @@ public void close() throws IOException emitter.emit(builder.build("query/wait/time", startTime - creationTime)); } } + // Use finally block because emitting query time (in `try {}`, above) and the status (in `finally {}`, + // below) are unrelated and we don't want the latter to be skipped if the former thrown any exception. finally { - yielder.close(); + if (thrown != null) { + builder.setDimension(DruidMetrics.STATUS, "failed"); + } else if (!isDone) { + builder.setDimension(DruidMetrics.STATUS, "short"); + } } } - }; - } - }; + } + ); } } diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index a3da01300c2d..208ee1134fcc 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import io.druid.granularity.QueryGranularity; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; @@ -78,7 +77,7 @@ public static QueryRunner makeClosingQueryRunner(final QueryRunner ru @Override public Sequence run(Query query, Map responseContext) { - return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable); + return Sequences.withBaggage(runner.run(query, responseContext), closeable); } }; } diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index 5d390deddc63..a85a6f911286 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -20,8 +20,8 @@ package io.druid.query; import io.druid.java.util.common.guava.CloseQuietly; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.segment.ReferenceCountingSegment; import java.io.Closeable; @@ -54,7 +54,7 @@ public Sequence run(final Query query, Map responseContext try { final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); - return new ResourceClosingSequence(baseSequence, closeable); + return Sequences.withBaggage(baseSequence, closeable); } catch (RuntimeException e) { CloseQuietly.close(closeable); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index f79339f4831e..490e4c6c8a2b 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -31,7 +31,6 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; @@ -99,7 +98,7 @@ public static Sequence process( : new DateTime(Long.parseLong(fudgeTimestampString)); return Sequences.concat( - new ResourceClosingSequence<>( + Sequences.withBaggage( Sequences.map( cursors, new Function>() diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index cdf926b7cc5d..ff8d9b3cb601 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -33,7 +33,6 @@ import io.druid.data.input.Row; import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.GroupByMergedQueryRunner; @@ -118,7 +117,7 @@ public Sequence mergeResults( ) ); - return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); + return Sequences.withBaggage(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); } @Override @@ -201,7 +200,7 @@ public Sequence apply(Interval interval) innerQueryResultIndex.close(); - return new ResourceClosingSequence<>( + return Sequences.withBaggage( outerQuery.applyLimit(GroupByQueryHelper.postAggregate(query, outerQueryResultIndex)), outerQueryResultIndex ); diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index 1683b3de9114..c331ef7556d2 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -19,10 +19,12 @@ package io.druid.query.spec; -import com.google.common.base.Throwables; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.guava.YieldingAccumulator; @@ -35,7 +37,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; /** */ @@ -63,37 +64,28 @@ public Sequence run(final Query input, final Map responseC final String newName = String.format("%s_%s_%s", query.getType(), query.getDataSource(), query.getIntervals()); final Sequence baseSequence = doNamed( - currThread, currThreadName, newName, new Callable>() + currThread, currThreadName, newName, new Supplier>() { @Override - public Sequence call() throws Exception + public Sequence get() { return base.run(query, responseContext); } } ); - return new Sequence() + Sequence segmentMissingCatchingSequence = new Sequence() { @Override public OutType accumulate(final OutType initValue, final Accumulator accumulator) { - return doItNamed( - new Callable() - { - @Override - public OutType call() throws Exception - { - try { - return baseSequence.accumulate(initValue, accumulator); - } - catch (SegmentMissingException e) { - appendMissingSegment(responseContext); - return initValue; - } - } - } - ); + try { + return baseSequence.accumulate(initValue, accumulator); + } + catch (SegmentMissingException e) { + appendMissingSegment(responseContext); + return initValue; + } } @Override @@ -102,22 +94,13 @@ public Yielder toYielder( final YieldingAccumulator accumulator ) { - return doItNamed( - new Callable>() - { - @Override - public Yielder call() throws Exception - { - try { - return makeYielder(baseSequence.toYielder(initValue, accumulator)); - } - catch (SegmentMissingException e) { - appendMissingSegment(responseContext); - return Yielders.done(initValue, null); - } - } - } - ); + try { + return makeYielder(baseSequence.toYielder(initValue, accumulator)); + } + catch (SegmentMissingException e) { + appendMissingSegment(responseContext); + return Yielders.done(initValue, null); + } } private Yielder makeYielder(final Yielder yielder) @@ -133,16 +116,13 @@ public OutType get() @Override public Yielder next(final OutType initValue) { - return doItNamed( - new Callable>() - { - @Override - public Yielder call() throws Exception - { - return yielder.next(initValue); - } - } - ); + try { + return yielder.next(initValue); + } + catch (SegmentMissingException e) { + appendMissingSegment(responseContext); + return Yielders.done(initValue, null); + } } @Override @@ -158,12 +138,18 @@ public void close() throws IOException } }; } - - private RetType doItNamed(Callable toRun) - { - return doNamed(currThread, currThreadName, newName, toRun); - } }; + return Sequences.wrap( + segmentMissingCatchingSequence, + new SequenceWrapper() + { + @Override + public RetType wrap(Supplier sequenceProcessing) + { + return doNamed(currThread, currThreadName, newName, sequenceProcessing); + } + } + ); } private void appendMissingSegment(Map responseContext) @@ -176,14 +162,11 @@ private void appendMissingSegment(Map responseContext) missingSegments.add(specificSpec.getDescriptor()); } - private RetType doNamed(Thread currThread, String currName, String newName, Callable toRun) + private RetType doNamed(Thread currThread, String currName, String newName, Supplier toRun) { try { currThread.setName(newName); - return toRun.call(); - } - catch (Exception e) { - throw Throwables.propagate(e); + return toRun.get(); } finally { currThread.setName(currName); diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java deleted file mode 100644 index dd2e5b1d357d..000000000000 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment; - -import io.druid.java.util.common.guava.ResourceClosingYielder; -import io.druid.java.util.common.guava.Sequence; -import io.druid.java.util.common.guava.Yielder; -import io.druid.java.util.common.guava.YieldingAccumulator; -import io.druid.java.util.common.guava.YieldingSequenceBase; - -import java.io.Closeable; - -/** - */ -public class ReferenceCountingSequence extends YieldingSequenceBase -{ - private final Sequence baseSequence; - private final ReferenceCountingSegment segment; - - public ReferenceCountingSequence(Sequence baseSequence, ReferenceCountingSegment segment) - { - this.baseSequence = baseSequence; - this.segment = segment; - } - - @Override - public Yielder toYielder( - OutType initValue, YieldingAccumulator accumulator - ) - { - final Closeable closeable = segment.increment(); - return new ResourceClosingYielder(baseSequence.toYielder(initValue, accumulator), closeable); - } -} diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 8283f254503d..5b5b53695c2b 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -25,18 +25,15 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; - import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; import io.druid.java.util.common.guava.Sequences; -import io.druid.java.util.common.guava.Yielder; -import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.CacheStrategy; import io.druid.query.Druids; import io.druid.query.Query; @@ -168,20 +165,23 @@ private void testCloseAndPopulate( throws Exception { final AssertingClosable closable = new AssertingClosable(); - final Sequence resultSeq = new ResourceClosingSequence( - Sequences.simple(expectedRes), closable - ) - { - @Override - public Yielder toYielder(Object initValue, YieldingAccumulator accumulator) - { - Assert.assertFalse(closable.isClosed()); - return super.toYielder( - initValue, - accumulator - ); - } - }; + final Sequence resultSeq = Sequences.wrap( + Sequences.simple(expectedRes), + new SequenceWrapper() + { + @Override + public void before() + { + Assert.assertFalse(closable.isClosed()); + } + + @Override + public void after(boolean isDone, Throwable thrown) throws Exception + { + closable.close(); + } + } + ); Cache cache = MapCache.create(1024 * 1024); From 8d5d374d62bf304faf0b7d7c31558ef732bde6d0 Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 11 Jan 2017 18:04:42 -0600 Subject: [PATCH 2/5] Make Sequences.withEffect() execute the effect if the wrapped sequence throws exception from close() --- .../common/guava/ExecuteWhenDoneYielder.java | 24 +++++++++++-- .../java/util/common/guava/Sequences.java | 10 +----- .../common/guava/WithEffectSequenceTest.java | 36 ++++++++++++++++++- 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java b/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java index df4e7cfd92a5..2b8033c3a914 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/ExecuteWhenDoneYielder.java @@ -57,9 +57,27 @@ public boolean isDone() public void close() throws IOException { boolean done = isDone(); - baseYielder.close(); - if (done) { - executor.execute(runnable); + Throwable thrown = null; + try { + baseYielder.close(); + } + catch (Throwable t) { + thrown = t; + throw t; + } + finally { + if (done) { + if (thrown != null) { + try { + executor.execute(runnable); + } + catch (Throwable t) { + thrown.addSuppressed(t); + } + } else { + executor.execute(runnable); + } + } } } } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java index c7156e3e105b..cb4209fd7e43 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java @@ -121,16 +121,8 @@ public static Sequence wrap(Sequence seq, SequenceWrapper wrapper) public static Sequence withEffect(final Sequence seq, final Runnable effect, final Executor exec) { - return new Sequence() + return new YieldingSequenceBase() { - @Override - public OutType accumulate(OutType initValue, Accumulator accumulator) - { - final OutType out = seq.accumulate(initValue, accumulator); - exec.execute(effect); - return out; - } - @Override public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) { diff --git a/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java b/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java index 81efbf8e4733..3f91b6bc7489 100644 --- a/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/guava/WithEffectSequenceTest.java @@ -23,8 +23,11 @@ import org.junit.Assert; import org.junit.Test; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class WithEffectSequenceTest @@ -64,7 +67,8 @@ public void run() Assert.assertEquals(1, effect1.get()); Assert.assertEquals(2, effect2.get()); - // Ensure sequence runs via Yielder, because LimitedSequence extends YieldingSequenceBase + // Ensure sequence runs via Yielder, because LimitedSequence extends YieldingSequenceBase which + // implements accumulate() via yielder(). // "Limiting" a sequence of 3 elements with 4 to let effects be executed. If e. g. limit with 1 or 2, effects are // not executed. Sequence yieldingSequence = Sequences.limit(sequence, 4); @@ -72,4 +76,34 @@ public void run() Assert.assertEquals(3, effect1.get()); Assert.assertEquals(4, effect2.get()); } + + @Test + public void testEffectExecutedIfWrappedSequenceThrowsExceptionFromClose() { + Sequence baseSeq = Sequences.simple(Arrays.asList(1, 2, 3)); + Sequence throwingSeq = Sequences.withBaggage(baseSeq, new Closeable() + { + @Override + public void close() throws IOException + { + throw new RuntimeException(); + + } + }); + final AtomicBoolean effectExecuted = new AtomicBoolean(); + Sequence seqWithEffect = Sequences.withEffect(throwingSeq, new Runnable() + { + @Override + public void run() + { + effectExecuted.set(true); + } + }, MoreExecutors.sameThreadExecutor()); + try { + Sequences.toList(seqWithEffect, new ArrayList()); + Assert.fail("expected RuntimeException"); + } catch (RuntimeException e) { + // expected + Assert.assertTrue(effectExecuted.get()); + } + } } From f304d4bfda56dd53f0d1952c78cb4237ee01f68b Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 11 Jan 2017 18:10:19 -0600 Subject: [PATCH 3/5] Fix strange code in MetricsEmittingQueryRunner --- .../query/MetricsEmittingQueryRunner.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 361375bf8ef7..c35547c051df 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -115,22 +115,16 @@ public void before() @Override public void after(boolean isDone, Throwable thrown) { - try { - long timeTaken = System.currentTimeMillis() - startTime; - emitter.emit(builder.build(metricName, timeTaken)); - - if (creationTime > 0) { - emitter.emit(builder.build("query/wait/time", startTime - creationTime)); - } + if (thrown != null) { + builder.setDimension(DruidMetrics.STATUS, "failed"); + } else if (!isDone) { + builder.setDimension(DruidMetrics.STATUS, "short"); } - // Use finally block because emitting query time (in `try {}`, above) and the status (in `finally {}`, - // below) are unrelated and we don't want the latter to be skipped if the former thrown any exception. - finally { - if (thrown != null) { - builder.setDimension(DruidMetrics.STATUS, "failed"); - } else if (!isDone) { - builder.setDimension(DruidMetrics.STATUS, "short"); - } + long timeTaken = System.currentTimeMillis() - startTime; + emitter.emit(builder.build(metricName, timeTaken)); + + if (creationTime > 0) { + emitter.emit(builder.build("query/wait/time", startTime - creationTime)); } } } From db5c1d7d94fba4e869eb09baf37712ad464d41b9 Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 13 Jan 2017 16:18:03 -0600 Subject: [PATCH 4/5] Add comment on why YieldingSequenceBase is used in Sequences.withEffect() --- .../java/io/druid/java/util/common/guava/Sequences.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java index cb4209fd7e43..c132ce99d2c2 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java @@ -121,6 +121,12 @@ public static Sequence wrap(Sequence seq, SequenceWrapper wrapper) public static Sequence withEffect(final Sequence seq, final Runnable effect, final Executor exec) { + // Uses YieldingSequenceBase to be able to execute the effect if all elements of the wrapped seq are processed + // (i. e. it "is done"), but the yielder of the underlying seq throws some exception from close(). This logic could + // be found in ExecuteWhenDoneYielder.close(). If accumulate() is implemented manually in this anonymous class + // instead of extending YieldingSequenceBase, it's not possible to distinguish exception thrown during elements + // processing in accumulate() of the underlying seq, from exception thrown after all elements are processed, + // in close(). return new YieldingSequenceBase() { @Override From 1b4f8f586ea3eac08198f5cc9a58623bdb24bccd Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 13 Jan 2017 16:18:51 -0600 Subject: [PATCH 5/5] Use Closer in OrderedMergeSequence and MergeSequence to close multiple yielders --- .../main/java/io/druid/collections/OrderedMergeSequence.java | 5 ++++- .../java/io/druid/java/util/common/guava/MergeSequence.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/io/druid/collections/OrderedMergeSequence.java b/common/src/main/java/io/druid/collections/OrderedMergeSequence.java index f5e8474356bc..9bd32c9dc0c7 100644 --- a/common/src/main/java/io/druid/collections/OrderedMergeSequence.java +++ b/common/src/main/java/io/druid/collections/OrderedMergeSequence.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Ordering; +import com.google.common.io.Closer; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; @@ -207,9 +208,11 @@ public boolean isDone() @Override public void close() throws IOException { + Closer closer = Closer.create(); while(!pQueue.isEmpty()) { - pQueue.remove().close(); + closer.register(pQueue.remove()); } + closer.close(); } }; } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/MergeSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/MergeSequence.java index d0597415e332..0293927992da 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/MergeSequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/MergeSequence.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Ordering; +import com.google.common.io.Closer; import java.io.IOException; import java.util.PriorityQueue; @@ -150,9 +151,11 @@ public boolean isDone() @Override public void close() throws IOException { + Closer closer = Closer.create(); while (!pQueue.isEmpty()) { - pQueue.remove().close(); + closer.register(pQueue.remove()); } + closer.close(); } }; }