diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index cd9261057f8c..839682aceb59 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -49,6 +49,7 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; +import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -132,6 +133,8 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; + private static final String SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; + public RealtimePlumber( DataSchema schema, @@ -255,6 +258,7 @@ private Sink getSink(long timestamp) @Override public QueryRunner getQueryRunner(final Query query) { + final boolean skipIncrementalSegment = query.getContextValue(SKIP_INCREMENTAL_SEGMENT, false); final QueryRunnerFactory> factory = conglomerate.findFactory(query); final QueryToolChest> toolchest = factory.getToolchest(); @@ -322,6 +326,10 @@ public QueryRunner apply(FireHydrant input) return new ReportTimelineMissingSegmentQueryRunner(descriptor); } + if (skipIncrementalSegment && !input.hasSwapped()) { + return new NoopQueryRunner(); + } + // Prevent the underlying segment from closing when its being iterated final Closeable closeable = input.getSegment().increment(); try {