diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 4b2a93ef031d..82d745e61f66 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -29,9 +29,7 @@ import com.google.common.primitives.Ints; import com.metamx.common.guava.CloseQuietly; import com.metamx.emitter.EmittingLogger; -import io.druid.data.input.Committer; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.*; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -54,6 +52,7 @@ import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.FirehoseV2CloseBeforeStartException; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; @@ -116,6 +115,9 @@ private static String makeDatasource(FireDepartment fireDepartment) @JsonIgnore private volatile Firehose firehose = null; + @JsonIgnore + private volatile FirehoseV2 firehoseV2 = null; + @JsonIgnore private volatile FireDepartmentMetrics metrics = null; @@ -321,32 +323,70 @@ public String getVersion(final Interval interval) Supplier committerSupplier = null; try { - plumber.startJob(); + Object metadata = plumber.startJob(); // Set up metrics emission toolbox.getMonitorScheduler().addMonitor(metricsMonitor); - // Delay firehose connection to avoid claiming input resources while the plumber is starting up. - final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory(); - final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory); + if (fireDepartment.checkFirehoseV2()) + { + final boolean firehoseV2DrainableByClosing = + isFirehoseV2DrainableByClosing(spec.getIOConfig().getFirehoseFactoryV2()); + boolean normalStart = true; + + // Skip connecting firehose if we've been stopped before we got started. + synchronized (this) { + if (!gracefullyStopped) { + firehoseV2 = fireDepartment.connect(metadata); + committerSupplier = Committers.supplierFromFirehoseV2(firehoseV2); + try { + firehoseV2.start(); + } catch (FirehoseV2CloseBeforeStartException e) + { + normalStart = false; + } + } + } - // Skip connecting firehose if we've been stopped before we got started. - synchronized (this) { - if (!gracefullyStopped) { - firehose = firehoseFactory.connect(spec.getDataSchema().getParser()); - committerSupplier = Committers.supplierFromFirehose(firehose); + if (normalStart) + { + // Time to read data! + while (firehoseV2 != null && (!gracefullyStopped || firehoseV2DrainableByClosing)) + { + Plumbers.addNextRowV2( + committerSupplier, + firehoseV2, + plumber, + tuningConfig.isReportParseExceptions(), + metrics + ); + + if (!firehoseV2.advance()) break; + } + } + } else { + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. + final boolean firehoseDrainableByClosing = + isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory()); + + // Skip connecting firehose if we've been stopped before we got started. + synchronized (this) { + if (!gracefullyStopped) { + firehose = fireDepartment.connect(); + committerSupplier = Committers.supplierFromFirehose(firehose); + } } - } - // Time to read data! - while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { - Plumbers.addNextRow( - committerSupplier, - firehose, - plumber, - tuningConfig.isReportParseExceptions(), - metrics - ); + // Time to read data! + while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { + Plumbers.addNextRow( + committerSupplier, + firehose, + plumber, + tuningConfig.isReportParseExceptions(), + metrics + ); + } } } catch (Throwable e) { @@ -359,7 +399,7 @@ public String getVersion(final Interval interval) if (normalExit) { try { // Persist if we had actually started. - if (firehose != null) { + if (firehose != null || firehoseV2 != null) { log.info("Persisting remaining data."); final Committer committer = committerSupplier.get(); @@ -417,6 +457,9 @@ public void run() if (firehose != null) { CloseQuietly.close(firehose); } + if (firehoseV2 != null) { + CloseQuietly.close(firehoseV2); + } toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); } } @@ -439,14 +482,17 @@ public void stopGracefully() synchronized (this) { if (!gracefullyStopped) { gracefullyStopped = true; - if (firehose == null) { + if (firehose == null && firehoseV2 == null) { log.info("stopGracefully: Firehose not started yet, so nothing to stop."); } else if (finishingJob) { log.info("stopGracefully: Interrupting finishJob."); runThread.interrupt(); - } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + } else if (firehose != null && isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { log.info("stopGracefully: Draining firehose."); firehose.close(); + } else if (firehoseV2 != null && isFirehoseV2DrainableByClosing(spec.getIOConfig().getFirehoseFactoryV2())) { + log.info("stopGracefully: Draining firehoseV2."); + firehoseV2.close(); } else { log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread."); runThread.interrupt(); @@ -468,6 +514,15 @@ public Firehose getFirehose() return firehose; } + /** + * Public for tests. + */ + @JsonIgnore + public FirehoseV2 getFirehoseV2() + { + return firehoseV2; + } + /** * Public for tests. */ @@ -500,6 +555,19 @@ && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory). && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate())); } + /** + * Is a firehoseV2 from this factory drainable by closing it? If so, we should drain on stopGracefully rather than + * abruptly stopping. + * + * This is a hack to get around the fact that the FirehoseV2 and FirehoseFactoryV2 interfaces do not help us do this. + * And, currently no FirehoseFactoryV2 implementation supports drainable by closing yet. + * Protected for tests. + */ + protected boolean isFirehoseV2DrainableByClosing(FirehoseFactoryV2 firehoseFactoryV2) + { + return false; + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 1c40b7b6d320..443962fb1be3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -45,10 +46,7 @@ import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.concurrent.Execs; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.*; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.SegmentLoaderFactory; @@ -98,6 +96,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.firehose.FirehoseV2CloseBeforeStartException; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; @@ -212,6 +211,89 @@ public void close() throws IOException } } + private static class TestFirehoseV2 implements FirehoseV2 + { + private final List queue = Lists.newLinkedList(); + private boolean closed = false; + private InputRow currRow; + + public void addRows(List rows) + { + synchronized (this) { + queue.addAll(rows); + notifyAll(); + } + } + + @Override + public InputRow currRow() + { + if (currRow != null && currRow.getDimensions().contains(FAIL_DIM)) { + throw new ParseException(FAIL_DIM); + } + return currRow; + } + + @Override + public boolean advance() + { + try { + synchronized (this) { + while (queue.isEmpty() && !closed) { + wait(); + } + if (!queue.isEmpty()) { + currRow = queue.remove(0); + return true; + } else { + currRow = null; + return false; + } + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public Committer makeCommitter() + { + return new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + } + }; + } + + @Override + public void start() throws Exception + { + if (!advance()) + { + throw new FirehoseV2CloseBeforeStartException("stopped before start"); + } + } + + @Override + public void close() throws IOException + { + synchronized (this) { + closed = true; + notifyAll(); + } + } + } + private static class TestFirehoseFactory implements FirehoseFactory { public TestFirehoseFactory() @@ -225,6 +307,71 @@ public Firehose connect(InputRowParser parser) throws IOException, ParseExceptio } } + private static class TestFirehoseFactoryV2 implements FirehoseFactoryV2 + { + public TestFirehoseFactoryV2() + { + } + + @Override + public FirehoseV2 connect(InputRowParser inputRowParser, Object o) throws IOException, ParseException + { + return new TestFirehoseV2(); + } + } + + private static class TestTaskFirehoseHolder + { + private TestFirehose firehose; + private TestFirehoseV2 firehoseV2; + final boolean isV2; + final RealtimeIndexTask task; + + public TestTaskFirehoseHolder( + RealtimeIndexTask task + ) + { + FirehoseFactory firehoseFactory = task.getRealtimeIngestionSchema().getIOConfig().getFirehoseFactory(); + FirehoseFactoryV2 firehoseFactoryV2 = task.getRealtimeIngestionSchema().getIOConfig().getFirehoseFactoryV2(); + + Preconditions.checkArgument((firehoseFactory == null) ^ (firehoseFactoryV2 == null), + "either firehoseFactory or firehoseFactoryV2 specified, not both"); + isV2 = firehoseFactoryV2 != null; + this.task = task; + } + + public void addRows(List rows) + { + if(isV2) { + Preconditions.checkArgument(firehoseV2 != null, "should use after firehose is connected"); + firehoseV2.addRows(rows); + } else { + Preconditions.checkArgument(firehose != null, "should use after firehose is connected"); + firehose.addRows(rows); + } + } + + public Object getFirehose() + { + if (isV2) { + firehoseV2 = (TestFirehoseV2)task.getFirehoseV2(); + return firehoseV2; + } else { + firehose = (TestFirehose)task.getFirehose(); + return firehose; + } + } + + public void close() throws IOException + { + if(isV2) { + firehoseV2.close(); + } else { + firehose.close(); + } + } + } + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -232,23 +379,32 @@ public Firehose connect(InputRowParser parser) throws IOException, ParseExceptio public final TemporaryFolder tempFolder = new TemporaryFolder(); private final boolean buildV9Directly; + private final FirehoseFactory firehoseFactory; + private final FirehoseFactoryV2 firehoseFactoryV2; private DateTime now; private ListeningExecutorService taskExec; private Map> handOffCallbacks; - @Parameterized.Parameters(name = "buildV9Directly = {0}") + @Parameterized.Parameters(name = "buildV9Directly = {0}, firehoseFactory = {1}, firehoseFactoryV2 = {2}") public static Collection constructorFeeder() throws IOException { return ImmutableList.of( - new Object[]{true}, - new Object[]{false} + new Object[]{true, new TestFirehoseFactory(), null}, + new Object[]{true, null, new TestFirehoseFactoryV2()}, + new Object[]{false, new TestFirehoseFactory(), null}, + new Object[]{false, null, new TestFirehoseFactoryV2()} ); } - public RealtimeIndexTaskTest(boolean buildV9Directly) + public RealtimeIndexTaskTest( + boolean buildV9Directly, + FirehoseFactory firehoseFactory, + FirehoseFactoryV2 firehoseFactoryV2) { this.buildV9Directly = buildV9Directly; + this.firehoseFactory = firehoseFactory; + this.firehoseFactoryV2 = firehoseFactoryV2; } @Before @@ -292,13 +448,12 @@ public void testHandoffTimeout() throws Exception final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -309,7 +464,7 @@ public void testHandoffTimeout() throws Exception ); // Stop the firehose, this will drain out existing events. - firehose.close(); + holder.close(); // Wait for publish. while (mdc.getPublished().isEmpty()) { @@ -334,13 +489,12 @@ public void testBasics() throws Exception final DataSegment publishedSegment; // Wait for firehose to show up, it starts off null. - while (task.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -361,7 +515,7 @@ public void testBasics() throws Exception ); // Stop the firehose, this will drain out existing events. - firehose.close(); + holder.close(); // Wait for publish. while (mdc.getPublished().isEmpty()) { @@ -408,13 +562,12 @@ public void testReportParseExceptionsOnBadMetric() throws Exception final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -440,7 +593,7 @@ public void testReportParseExceptionsOnBadMetric() throws Exception ); // Stop the firehose, this will drain out existing events. - firehose.close(); + holder.close(); // Wait for the task to finish. expectedException.expect(ExecutionException.class); @@ -475,13 +628,12 @@ public void testNoReportParseExceptions() throws Exception final DataSegment publishedSegment; // Wait for firehose to show up, it starts off null. - while (task.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task.getFirehose(); - - firehose.addRows( + holder.addRows( Arrays.asList( // Good row- will be processed. new MapBasedInputRow( @@ -519,7 +671,7 @@ public void testNoReportParseExceptions() throws Exception ); // Stop the firehose, this will drain out existing events. - firehose.close(); + holder.close(); // Wait for publish. while (mdc.getPublished().isEmpty()) { @@ -571,13 +723,12 @@ public void testRestore() throws Exception final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task1.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task1); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task1.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -606,16 +757,15 @@ public void testRestore() throws Exception final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task2.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task2); + while (holder.getFirehose() == null) { Thread.sleep(50); } // Do a query, at this point the previous data should be loaded. Assert.assertEquals(1, sumMetric(task2, "rows")); - final TestFirehose firehose = (TestFirehose) task2.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -626,7 +776,7 @@ public void testRestore() throws Exception ); // Stop the firehose, this will drain out existing events. - firehose.close(); + holder.close(); // Wait for publish. while (mdc.getPublished().isEmpty()) { @@ -674,13 +824,12 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task1.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task1); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task1.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -691,7 +840,7 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception ); // Stop the firehose, this will trigger a finishJob. - firehose.close(); + holder.close(); // Wait for publish. while (mdc.getPublished().isEmpty()) { @@ -719,15 +868,13 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task2.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task2); + while (holder.getFirehose() == null) { Thread.sleep(50); } - // Stop the firehose again, this will start another handoff. - final TestFirehose firehose = (TestFirehose) task2.getFirehose(); - // Stop the firehose, this will trigger a finishJob. - firehose.close(); + holder.close(); // publishedSegment is still published. No reason it shouldn't be. Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); @@ -771,13 +918,12 @@ public void testRestoreCorruptData() throws Exception final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. - while (task1.getFirehose() == null) { + TestTaskFirehoseHolder holder = new TestTaskFirehoseHolder(task1); + while (holder.getFirehose() == null) { Thread.sleep(50); } - final TestFirehose firehose = (TestFirehose) task1.getFirehose(); - - firehose.addRows( + holder.addRows( ImmutableList.of( new MapBasedInputRow( now, @@ -892,9 +1038,9 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa objectMapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( - new TestFirehoseFactory(), + firehoseFactory, null, - null + firehoseFactoryV2 ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, @@ -924,6 +1070,12 @@ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) { return true; } + + @Override + protected boolean isFirehoseV2DrainableByClosing(FirehoseFactoryV2 firehoseFactoryV2) + { + return true; + } }; } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/FirehoseV2CloseBeforeStartException.java b/server/src/main/java/io/druid/segment/realtime/firehose/FirehoseV2CloseBeforeStartException.java new file mode 100644 index 000000000000..57499fd3e04d --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/FirehoseV2CloseBeforeStartException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2011,2012 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.realtime.firehose; + +public class FirehoseV2CloseBeforeStartException extends RuntimeException +{ + public FirehoseV2CloseBeforeStartException(String formatText, Object... arguments) + { + super(String.format(formatText, arguments)); + } + + public FirehoseV2CloseBeforeStartException(Throwable cause, String formatText, Object... arguments) + { + super(String.format(formatText, arguments), cause); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index 5f54d4d5a2b0..42727e719848 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -25,6 +25,7 @@ import com.metamx.common.parsers.ParseException; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseV2; import io.druid.data.input.InputRow; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -38,6 +39,27 @@ private Plumbers() // No instantiation } + public static void addNextRowV2( + final Supplier committerSupplier, + final FirehoseV2 firehose, + final Plumber plumber, + final boolean reportParseExceptions, + final FireDepartmentMetrics metrics + ) + { + try { + InputRow inputRow = firehose.currRow(); + addRow(committerSupplier, inputRow, plumber, reportParseExceptions, metrics); + } catch (ParseException e) { + if (reportParseExceptions) { + throw e; + } else { + log.debug(e, "Discarded row due to exception, considering unparseable."); + metrics.incrementUnparseable(); + } + } + } + public static void addNextRow( final Supplier committerSupplier, final Firehose firehose, @@ -46,20 +68,27 @@ public static void addNextRow( final FireDepartmentMetrics metrics ) { - final InputRow inputRow; try { - inputRow = firehose.nextRow(); - } - catch (ParseException e) { + InputRow inputRow = firehose.nextRow(); + addRow(committerSupplier, inputRow, plumber, reportParseExceptions, metrics); + } catch (ParseException e) { if (reportParseExceptions) { throw e; } else { log.debug(e, "Discarded row due to exception, considering unparseable."); metrics.incrementUnparseable(); - return; } } + } + private static void addRow( + final Supplier committerSupplier, + final InputRow inputRow, + final Plumber plumber, + final boolean reportParseExceptions, + final FireDepartmentMetrics metrics + ) + { if (inputRow == null) { if (reportParseExceptions) { throw new ParseException("null input row"); @@ -70,22 +99,29 @@ public static void addNextRow( } } - final int numRows; try { - numRows = plumber.add(inputRow, committerSupplier); - } - catch (IndexSizeExceededException e) { + // Included in ParseException try/catch, as additional parsing can be done during indexing. + int numRows = plumber.add(inputRow, committerSupplier); + + if (numRows == -1) { + metrics.incrementThrownAway(); + log.debug("Discarded row[%s], considering thrownAway.", inputRow); + return; + } + + metrics.incrementProcessed(); + } catch (ParseException e) { + if (reportParseExceptions) { + throw e; + } else { + log.debug(e, "Discarded row due to exception, considering unparseable."); + metrics.incrementUnparseable(); + return; + } + } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. // plumber.add should be swapping out indexes before they fill up. throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!"); } - - if (numRows == -1) { - metrics.incrementThrownAway(); - log.debug("Discarded row[%s], considering thrownAway.", inputRow); - return; - } - - metrics.incrementProcessed(); } }