diff --git a/core/src/main/java/org/apache/druid/timeline/LogicalSegment.java b/core/src/main/java/org/apache/druid/timeline/LogicalSegment.java index 673abb01b8e7..714eb6715685 100644 --- a/core/src/main/java/org/apache/druid/timeline/LogicalSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/LogicalSegment.java @@ -22,8 +22,26 @@ import org.apache.druid.guice.annotations.PublicApi; import org.joda.time.Interval; +/** + * A logical segment can represent an entire segment or a part of a segment. As a result, it can have a different + * interval from its actual base segment. {@link #getInterval()} and {@link #getTrueInterval()} return the interval of + * this logical segment and the interval of the base segment, respectively. + * + * For example, suppose we have 2 segments as below: + * + * - Segment A has an interval of 2017/2018. + * - Segment B has an interval of 2017-08-01/2017-08-02. + * + * For these segments, {@link VersionedIntervalTimeline#lookup} returns 3 segments as below: + * + * - interval of 2017/2017-08-01 (trueInterval: 2017/2018) + * - interval of 2017-08-01/2017-08-02 (trueInterval: 2017-08-01/2017-08-02) + * - interval of 2017-08-02/2018 (trueInterval: 2017/2018) + */ @PublicApi public interface LogicalSegment { Interval getInterval(); + + Interval getTrueInterval(); } diff --git a/core/src/main/java/org/apache/druid/timeline/TimelineObjectHolder.java b/core/src/main/java/org/apache/druid/timeline/TimelineObjectHolder.java index 8e95fc623f77..3feca88495c3 100644 --- a/core/src/main/java/org/apache/druid/timeline/TimelineObjectHolder.java +++ b/core/src/main/java/org/apache/druid/timeline/TimelineObjectHolder.java @@ -19,6 +19,7 @@ package org.apache.druid.timeline; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; @@ -27,16 +28,25 @@ public class TimelineObjectHolder implements LogicalSegment { private final Interval interval; + private final Interval trueInterval; private final VersionType version; private final PartitionHolder object; + @VisibleForTesting + public TimelineObjectHolder(Interval interval, VersionType version, PartitionHolder object) + { + this(interval, interval, version, object); + } + public TimelineObjectHolder( Interval interval, + Interval trueInterval, VersionType version, PartitionHolder object ) { this.interval = interval; + this.trueInterval = trueInterval; this.version = version; this.object = object; } @@ -47,6 +57,12 @@ public Interval getInterval() return interval; } + @Override + public Interval getTrueInterval() + { + return trueInterval; + } + public VersionType getVersion() { return version; @@ -62,6 +78,7 @@ public String toString() { return "TimelineObjectHolder{" + "interval=" + interval + + ", trueInterval=" + trueInterval + ", version=" + version + ", object=" + object + '}'; diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 85855ad8ee01..36c177dfb0f0 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -300,6 +300,7 @@ public TimelineObjectHolder last() private TimelineObjectHolder timelineEntryToObjectHolder(TimelineEntry entry) { return new TimelineObjectHolder<>( + entry.getTrueInterval(), entry.getTrueInterval(), entry.getVersion(), new PartitionHolder<>(entry.getPartitionHolder()) @@ -586,10 +587,11 @@ private List> lookup(Interval inte if (timelineInterval.overlaps(interval)) { retVal.add( - new TimelineObjectHolder( + new TimelineObjectHolder<>( timelineInterval, + val.getTrueInterval(), val.getVersion(), - new PartitionHolder(val.getPartitionHolder()) + new PartitionHolder<>(val.getPartitionHolder()) ) ); } @@ -604,8 +606,9 @@ private List> lookup(Interval inte .isAfter(firstEntry.getInterval().getStart())) { retVal.set( 0, - new TimelineObjectHolder( + new TimelineObjectHolder<>( new Interval(interval.getStart(), firstEntry.getInterval().getEnd()), + firstEntry.getTrueInterval(), firstEntry.getVersion(), firstEntry.getObject() ) @@ -616,8 +619,9 @@ private List> lookup(Interval inte if (interval.overlaps(lastEntry.getInterval()) && interval.getEnd().isBefore(lastEntry.getInterval().getEnd())) { retVal.set( retVal.size() - 1, - new TimelineObjectHolder( + new TimelineObjectHolder<>( new Interval(lastEntry.getInterval().getStart(), interval.getEnd()), + lastEntry.getTrueInterval(), lastEntry.getVersion(), lastEntry.getObject() ) diff --git a/extensions-core/druid-kerberos/pom.xml b/extensions-core/druid-kerberos/pom.xml index b6fad3510f58..6fbdb5459084 100644 --- a/extensions-core/druid-kerberos/pom.xml +++ b/extensions-core/druid-kerberos/pom.xml @@ -71,6 +71,7 @@ org.apache.hadoop hadoop-common ${hadoop.compile.version} + compile commons-cli diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index c60787717db9..f8eda5d7eca8 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -151,6 +151,130 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + org.apache.hadoop + hadoop-annotations + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + org.apache.curator + curator-client + + + org.apache.commons + commons-math3 + + + com.google.guava + guava + + + org.apache.avro + avro + + + net.java.dev.jets3t + jets3t + + + com.sun.jersey + jersey-json + + + com.jcraft + jsch + + + org.mortbay.jetty + jetty + + + com.sun.jersey + jersey-server + + + org.apache.hadoop hadoop-aws @@ -164,6 +288,13 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + tests + test + junit junit @@ -189,13 +320,6 @@ tests test - - org.apache.hadoop - hadoop-common - ${hadoop.compile.version} - tests - test - org.apache.hadoop hadoop-hdfs diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregator.java index 94400f781bbb..c47044cd3307 100644 --- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregator.java +++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregator.java @@ -82,7 +82,7 @@ public float getBucketSize() @JsonProperty public float getOffset() { - return bucketSize; + return offset; } @Override diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregatorTest.java new file mode 100644 index 000000000000..ebcd2823dc81 --- /dev/null +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/BucketsPostAggregatorTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.histogram; + +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class BucketsPostAggregatorTest +{ + @Test + public void testSerde() throws Exception + { + BucketsPostAggregator aggregator1 = + new BucketsPostAggregator("buckets_post_aggregator", "test_field", 2f, 4f); + + DefaultObjectMapper mapper = new DefaultObjectMapper(); + BucketsPostAggregator aggregator2 = mapper.readValue( + mapper.writeValueAsString(aggregator1), + BucketsPostAggregator.class + ); + + Assert.assertEquals(aggregator1.getBucketSize(), aggregator2.getBucketSize(), 0.0001); + Assert.assertEquals(aggregator1.getOffset(), aggregator2.getOffset(), 0.0001); + Assert.assertArrayEquals(aggregator1.getCacheKey(), aggregator2.getCacheKey()); + Assert.assertEquals(aggregator1.getDependentFields(), aggregator2.getDependentFields()); + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java index 4fd54b499566..84a76d15e41a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java @@ -174,7 +174,10 @@ private void selfCheckingMove( .withPrefix(s3Path) .withMaxKeys(1) ); - if (listResult.getKeyCount() == 0) { + // Using getObjectSummaries().size() instead of getKeyCount as, in some cases + // it is observed that even though the getObjectSummaries returns some data + // keyCount is still zero. + if (listResult.getObjectSummaries().size() == 0) { // should never happen throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index e0a3dac8cae5..97858864b149 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -251,7 +251,10 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS .withMaxKeys(1); final ListObjectsV2Result result = s3Client.listObjectsV2(request); - if (result.getKeyCount() == 0) { + // Using getObjectSummaries().size() instead of getKeyCount as, in some cases + // it is observed that even though the getObjectSummaries returns some data + // keyCount is still zero. + if (result.getObjectSummaries().size() == 0) { throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key); } final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0); diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 1fd923c99bfe..aecacbf7f9d4 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -83,6 +83,22 @@ com.google.code.findbugs jsr305 + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + javax.servlet + servlet-api + + + @@ -130,22 +146,6 @@ ${hadoop.compile.version} test - - org.apache.hadoop - hadoop-common - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core - provided - - - javax.servlet - servlet-api - - - org.apache.druid druid-server diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 398ed96a2f39..62c23e734a53 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -430,21 +430,12 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception @Override public void stopGracefully(TaskConfig taskConfig) { - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - File hadoopJobIdFile = new File(getHadoopJobIdFileName()); - String jobId = null; + // To avoid issue of kill command once the ingestion task is actually completed + if (!ingestionState.equals(IngestionState.COMPLETED)) { + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + String hadoopJobIdFile = getHadoopJobIdFileName(); - try { - if (hadoopJobIdFile.exists()) { - jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); - } - } - catch (Exception e) { - log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile); - } - - try { - if (jobId != null) { + try { ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), taskConfig.getDefaultHadoopCoordinates()); @@ -452,28 +443,28 @@ public void stopGracefully(TaskConfig taskConfig) "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", loader ); + String[] buildKillJobInput = new String[]{ - "-kill", - jobId + hadoopJobIdFile }; Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); Thread.currentThread().setContextClassLoader(loader); - final String killStatusString = (String) innerProcessingRunTask.invoke( + final String killStatusString[] = (String[]) innerProcessingRunTask.invoke( killMRJobInnerProcessingRunner, new Object[]{buildKillJobInput} ); - log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString)); + log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1])); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); } - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); } } @@ -722,10 +713,29 @@ public Map getStats() @SuppressWarnings("unused") public static class HadoopKillMRJobIdProcessingRunner { - public String runTask(String[] args) throws Exception + public String[] runTask(String[] args) throws Exception { - int res = ToolRunner.run(new JobClient(), args); - return res == 0 ? "Success" : "Fail"; + File hadoopJobIdFile = new File(args[0]); + String jobId = null; + + try { + if (hadoopJobIdFile.exists()) { + jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); + } + } + catch (Exception e) { + log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile); + } + + if (jobId != null) { + int res = ToolRunner.run(new JobClient(), new String[]{ + "-kill", + jobId + }); + + return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; + } + return new String[] {jobId, "Fail"}; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index b8402fab3bed..787852ddfbd5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1073,16 +1073,13 @@ public boolean equals(Object o) return true; } - if (!getClass().equals(o.getClass())) { + if (o == null || !getClass().equals(o.getClass())) { return false; } - final TaskLockPosse that = (TaskLockPosse) o; - if (!taskLock.equals(that.taskLock)) { - return false; - } - - return taskIds.equals(that.taskIds); + TaskLockPosse that = (TaskLockPosse) o; + return java.util.Objects.equals(taskLock, that.taskLock) && + java.util.Objects.equals(taskIds, that.taskIds); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 560425512513..a2a14a006bba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -667,6 +667,36 @@ public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws Entr Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked()); } + @Test + public void testLockPosseEquals() + { + final Task task1 = NoopTask.create(); + final Task task2 = NoopTask.create(); + + TaskLock taskLock1 = new TaskLock(TaskLockType.EXCLUSIVE, + task1.getGroupId(), + task1.getDataSource(), + Intervals.of("2018/2019"), + "v1", + task1.getPriority()); + + TaskLock taskLock2 = new TaskLock(TaskLockType.EXCLUSIVE, + task2.getGroupId(), + task2.getDataSource(), + Intervals.of("2018/2019"), + "v2", + task2.getPriority()); + + TaskLockPosse taskLockPosse1 = new TaskLockPosse(taskLock1); + TaskLockPosse taskLockPosse2 = new TaskLockPosse(taskLock2); + TaskLockPosse taskLockPosse3 = new TaskLockPosse(taskLock1); + + Assert.assertNotEquals(taskLockPosse1, null); + Assert.assertNotEquals(null, taskLockPosse1); + Assert.assertNotEquals(taskLockPosse1, taskLockPosse2); + Assert.assertEquals(taskLockPosse1, taskLockPosse3); + } + private Set getAllLocks(List tasks) { return tasks.stream() diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index e1befab71ad3..7b951c83647a 100644 --- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -22,9 +22,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -42,6 +39,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** */ @@ -68,19 +66,9 @@ public List filterSegments(DataSourceMetadataQuery final T max = segments.get(segments.size() - 1); - return Lists.newArrayList( - Iterables.filter( - segments, - new Predicate() - { - @Override - public boolean apply(T input) - { - return max != null && input.getInterval().overlaps(max.getInterval()); - } - } - ) - ); + return segments.stream() + .filter(input -> max != null && input.getInterval().overlaps(max.getTrueInterval())) + .collect(Collectors.toList()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 13d5521b2c2e..351903799f85 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.java.util.common.DateTimes; @@ -46,6 +44,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** */ @@ -85,20 +84,10 @@ public List filterSegments(TimeBoundaryQuery query final T min = query.isMaxTime() ? null : segments.get(0); final T max = query.isMinTime() ? null : segments.get(segments.size() - 1); - return Lists.newArrayList( - Iterables.filter( - segments, - new Predicate() - { - @Override - public boolean apply(T input) - { - return (min != null && input.getInterval().overlaps(min.getInterval())) || - (max != null && input.getInterval().overlaps(max.getInterval())); - } - } - ) - ); + return segments.stream() + .filter(input -> (min != null && input.getInterval().overlaps(min.getTrueInterval())) || + (max != null && input.getInterval().overlaps(max.getTrueInterval()))) + .collect(Collectors.toList()); } @Override diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index 2a23b547f660..3f20c25f87cd 100644 --- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -164,6 +164,12 @@ public Interval getInterval() { return Intervals.of("2012-01-01/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -172,6 +178,12 @@ public Interval getInterval() { return Intervals.of("2012-01-01T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -180,6 +192,12 @@ public Interval getInterval() { return Intervals.of("2013-01-01/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -188,6 +206,12 @@ public Interval getInterval() { return Intervals.of("2013-01-01T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -196,6 +220,12 @@ public Interval getInterval() { return Intervals.of("2013-01-01T02/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } } ) ); @@ -210,6 +240,12 @@ public Interval getInterval() { return Intervals.of("2013-01-01/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -218,6 +254,12 @@ public Interval getInterval() { return Intervals.of("2013-01-01T02/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } } ); @@ -226,6 +268,143 @@ public Interval getInterval() } } + @Test + public void testFilterOverlappingSegments() + { + final GenericQueryMetricsFactory queryMetricsFactory = DefaultGenericQueryMetricsFactory.instance(); + final DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest(queryMetricsFactory); + final List segments = toolChest + .filterSegments( + null, + ImmutableList.of( + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2015/2016-08-01"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2015/2016-08-01"); + } + }, + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2016-08-01/2017"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2016-08-01/2017"); + } + }, + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2017/2017-08-01"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017/2018"); + } + }, + new LogicalSegment() + { + + @Override + public Interval getInterval() + { + return Intervals.of("2017-08-01/2017-08-02"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017-08-01/2017-08-02"); + } + }, + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2017-08-02/2018"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017/2018"); + } + } + ) + ); + + final List expected = ImmutableList.of( + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2017/2017-08-01"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017/2018"); + } + }, + new LogicalSegment() + { + + @Override + public Interval getInterval() + { + return Intervals.of("2017-08-01/2017-08-02"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017-08-01/2017-08-02"); + } + }, + new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of("2017-08-02/2018"); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of("2017/2018"); + } + } + ); + + Assert.assertEquals(expected.size(), segments.size()); + + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i).getInterval(), segments.get(i).getInterval()); + Assert.assertEquals(expected.get(i).getTrueInterval(), segments.get(i).getTrueInterval()); + } + } + @Test public void testResultSerialization() { diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index c841af498d02..f823937ef03b 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -39,6 +39,7 @@ import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.segment.column.ValueType; import org.apache.druid.timeline.LogicalSegment; +import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -292,7 +293,20 @@ public void testFilterSegments() "2000-01-09/P1D" ) .stream() - .map(interval -> (LogicalSegment) () -> Intervals.of(interval)) + .map(interval -> new LogicalSegment() + { + @Override + public Interval getInterval() + { + return Intervals.of(interval); + } + + @Override + public Interval getTrueInterval() + { + return Intervals.of(interval); + } + }) .collect(Collectors.toList()) ); diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java index c5afad436d6d..31e70d376644 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java @@ -923,6 +923,12 @@ public Interval getInterval() { return Intervals.of("2012-01-01/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -931,6 +937,12 @@ public Interval getInterval() { return Intervals.of("2012-01-01T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -939,6 +951,12 @@ public Interval getInterval() { return Intervals.of("2013-01-05/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -947,6 +965,12 @@ public Interval getInterval() { return Intervals.of("2013-05-20/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -955,6 +979,12 @@ public Interval getInterval() { return Intervals.of("2014-01-05/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -963,6 +993,12 @@ public Interval getInterval() { return Intervals.of("2014-02-05/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -971,6 +1007,12 @@ public Interval getInterval() { return Intervals.of("2015-01-19T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -979,6 +1021,12 @@ public Interval getInterval() { return Intervals.of("2015-01-20T02/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } } ); @@ -998,6 +1046,12 @@ public Interval getInterval() { return Intervals.of("2015-01-19T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -1006,6 +1060,12 @@ public Interval getInterval() { return Intervals.of("2015-01-20T02/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } } ); @@ -1031,6 +1091,12 @@ public Interval getInterval() { return Intervals.of("2013-05-20/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -1039,6 +1105,12 @@ public Interval getInterval() { return Intervals.of("2014-01-05/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -1047,6 +1119,12 @@ public Interval getInterval() { return Intervals.of("2014-02-05/P1D"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -1055,6 +1133,12 @@ public Interval getInterval() { return Intervals.of("2015-01-19T01/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } }, new LogicalSegment() { @@ -1063,6 +1147,12 @@ public Interval getInterval() { return Intervals.of("2015-01-20T02/PT1H"); } + + @Override + public Interval getTrueInterval() + { + return getInterval(); + } } ); diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java index b62fb283c7f8..6ab886fe084a 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java @@ -73,6 +73,11 @@ public class TimeBoundaryQueryQueryToolChestTest .build(); private static LogicalSegment createLogicalSegment(final Interval interval) + { + return createLogicalSegment(interval, interval); + } + + private static LogicalSegment createLogicalSegment(final Interval interval, final Interval trueInterval) { return new LogicalSegment() { @@ -81,6 +86,12 @@ public Interval getInterval() { return interval; } + + @Override + public Interval getTrueInterval() + { + return trueInterval; + } }; } @@ -116,6 +127,35 @@ public void testFilterSegments() } } + @Test + public void testFilterOverlapingSegments() + { + final List actual = new TimeBoundaryQueryQueryToolChest().filterSegments( + TIME_BOUNDARY_QUERY, + Arrays.asList( + createLogicalSegment(Intervals.of("2015/2016-08-01")), + createLogicalSegment(Intervals.of("2016-08-01/2017")), + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")) + ) + ); + + final List expected = Arrays.asList( + createLogicalSegment(Intervals.of("2015/2016-08-01")), + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")) + ); + + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval()); + Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval()); + } + } + @Test public void testMaxTimeFilterSegments() { @@ -145,6 +185,62 @@ public void testMaxTimeFilterSegments() } } + @Test + public void testMaxTimeFilterOverlapingSegments() + { + final List actual = new TimeBoundaryQueryQueryToolChest().filterSegments( + MAXTIME_BOUNDARY_QUERY, + Arrays.asList( + createLogicalSegment(Intervals.of("2015/2016-08-01")), + createLogicalSegment(Intervals.of("2016-08-01/2017")), + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")) + ) + ); + + final List expected = Arrays.asList( + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")) + ); + + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval()); + Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval()); + } + } + + @Test + public void testMinTimeFilterOverlapingSegments() + { + final List actual = new TimeBoundaryQueryQueryToolChest().filterSegments( + MINTIME_BOUNDARY_QUERY, + Arrays.asList( + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2018/2018-08-01")), + createLogicalSegment(Intervals.of("2018-08-01/2019")) + ) + ); + + final List expected = Arrays.asList( + createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")), + createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")), + createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")) + ); + + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval()); + Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval()); + } + } + @Test public void testMinTimeFilterSegments() { @@ -192,6 +288,7 @@ public void testFilteredFilterSegments() Assert.assertEquals(7, segments.size()); } + @Test public void testCacheStrategy() throws Exception { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 3efbd8d0074e..4aaa95e12a63 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,8 @@ package org.apache.druid.sql.calcite.schema; +import com.amazonaws.annotation.GuardedBy; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; @@ -62,7 +64,6 @@ import org.apache.druid.sql.calcite.view.DruidViewMacro; import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import java.io.IOException; import java.util.Comparator; @@ -94,8 +95,9 @@ public class DruidSchema extends AbstractSchema private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long IS_PUBLISHED = 0; - private static final long IS_AVAILABLE = 1; + private static final long DEFAULT_IS_PUBLISHED = 0; + private static final long DEFAULT_IS_AVAILABLE = 1; + private static final long DEFAULT_NUM_ROWS = 0; private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; @@ -106,12 +108,12 @@ public class DruidSchema extends AbstractSchema // For awaitInitialization. private final CountDownLatch initialized = new CountDownLatch(1); - // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized + // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata private final Object lock = new Object(); // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. - // This data structure need to be accessed in a thread-safe way since SystemSchema accesses it + @GuardedBy("lock") private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; @@ -350,7 +352,8 @@ protected Multimap getFunctionMultim return builder.build(); } - private void addSegment(final DruidServerMetadata server, final DataSegment segment) + @VisibleForTesting + void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); @@ -359,16 +362,18 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; - final Map> serverSegmentMap = ImmutableMap.of( + final Set servers = ImmutableSet.of(server.getName()); + holder = SegmentMetadataHolder.builder( segment.getId(), - ImmutableSet.of(server.getName()) - ); - - holder = SegmentMetadataHolder - .builder(segment.getId(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, serverSegmentMap) - .build(); + DEFAULT_IS_PUBLISHED, + DEFAULT_IS_AVAILABLE, + isRealtime, + servers, + null, + DEFAULT_NUM_ROWS + ).build(); // Unknown segment. - setSegmentSignature(segment, holder); + setSegmentMetadataHolder(segment, holder); segmentsNeedingRefresh.add(segment); if (!server.segmentReplicatable()) { log.debug("Added new mutable segment[%s].", segment.getId()); @@ -377,14 +382,14 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm log.debug("Added new immutable segment[%s].", segment.getId()); } } else { - final Map> segmentServerMap = holder.getReplicas(); + final Set segmentServers = holder.getReplicas(); final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServerMap.get(segment.getId())) + .addAll(segmentServers) .add(server.getName()) .build(); final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder .from(holder) - .withReplicas(ImmutableMap.of(segment.getId(), servers)) + .withReplicas(servers) .build(); knownSegments.put(segment, holderWithNumReplicas); if (server.segmentReplicatable()) { @@ -402,7 +407,8 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm } } - private void removeSegment(final DataSegment segment) + @VisibleForTesting + void removeSegment(final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); @@ -433,13 +439,13 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final SegmentMetadataHolder holder = knownSegments.get(segment); - final Map> segmentServerMap = holder.getReplicas(); - final ImmutableSet servers = FluentIterable.from(segmentServerMap.get(segment.getId())) + final Set segmentServers = holder.getReplicas(); + final ImmutableSet servers = FluentIterable.from(segmentServers) .filter(Predicates.not(Predicates.equalTo(server.getName()))) .toSet(); final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder .from(holder) - .withReplicas(ImmutableMap.of(segment.getId(), servers)) + .withReplicas(servers) .build(); knownSegments.put(segment, holderWithNumReplicas); lock.notifyAll(); @@ -450,7 +456,8 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, * which may be a subset of the asked-for set. */ - private Set refreshSegments(final Set segments) throws IOException + @VisibleForTesting + Set refreshSegments(final Set segments) throws IOException { final Set retVal = new HashSet<>(); @@ -506,15 +513,26 @@ private Set refreshSegmentsForDataSource(final String dataSource, f log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); - SegmentMetadataHolder holder = dataSourceSegments.get(segment); - SegmentMetadataHolder updatedHolder = SegmentMetadataHolder - .from(holder) - .withRowSignature(rowSignature) - .withNumRows(analysis.getNumRows()) - .build(); - dataSourceSegments.put(segment, updatedHolder); - setSegmentSignature(segment, updatedHolder); - retVal.add(segment); + if (dataSourceSegments == null) { + log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); + } else { + SegmentMetadataHolder holder = dataSourceSegments.get(segment); + if (holder == null) { + log.warn( + "No segment[%s] found, skipping refresh", + segment.getId() + ); + } else { + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder + .from(holder) + .withRowSignature(rowSignature) + .withNumRows(analysis.getNumRows()) + .build(); + dataSourceSegments.put(segment, updatedHolder); + setSegmentMetadataHolder(segment, updatedHolder); + retVal.add(segment); + } + } } } @@ -536,7 +554,8 @@ private Set refreshSegmentsForDataSource(final String dataSource, f return retVal; } - private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + @VisibleForTesting + void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( @@ -628,7 +647,7 @@ private static RowSignature analysisToRowSignature(final SegmentAnalysis analysi return rowSignatureBuilder.build(); } - public Map getSegmentMetadata() + Map getSegmentMetadata() { final Map segmentMetadata = new HashMap<>(); synchronized (lock) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java index f2d5ab313b5c..38ff92858ec0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java @@ -23,7 +23,6 @@ import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; -import java.util.Map; import java.util.Set; /** @@ -36,15 +35,25 @@ public static Builder builder( long isPublished, long isAvailable, long isRealtime, - Map> segmentServerMap + Set segmentServers, + RowSignature rowSignature, + long numRows ) { - return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServerMap); + return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows); } public static Builder from(SegmentMetadataHolder h) { - return new Builder(h.getSegmentId(), h.isPublished(), h.isAvailable(), h.isRealtime(), h.getReplicas()); + return new Builder( + h.getSegmentId(), + h.isPublished(), + h.isAvailable(), + h.isRealtime(), + h.getReplicas(), + h.getRowSignature(), + h.getNumRows() + ); } private final SegmentId segmentId; @@ -54,8 +63,8 @@ public static Builder from(SegmentMetadataHolder h) private final long isPublished; private final long isAvailable; private final long isRealtime; - //segmentId -> set of servers that contain the segment - private final Map> segmentServerMap; + // set of servers that contain the segment + private final Set segmentServers; private final long numRows; @Nullable private final RowSignature rowSignature; @@ -66,7 +75,7 @@ private SegmentMetadataHolder(Builder builder) this.isPublished = builder.isPublished; this.isAvailable = builder.isAvailable; this.isRealtime = builder.isRealtime; - this.segmentServerMap = builder.segmentServerMap; + this.segmentServers = builder.segmentServers; this.numRows = builder.numRows; this.segmentId = builder.segmentId; } @@ -91,14 +100,14 @@ public SegmentId getSegmentId() return segmentId; } - public Map> getReplicas() + public Set getReplicas() { - return segmentServerMap; + return segmentServers; } - public long getNumReplicas(SegmentId segmentId) + public long getNumReplicas() { - return segmentServerMap.get(segmentId).size(); + return segmentServers.size(); } public long getNumRows() @@ -119,7 +128,7 @@ public static class Builder private final long isAvailable; private final long isRealtime; - private Map> segmentServerMap; + private Set segmentServers; @Nullable private RowSignature rowSignature; private long numRows; @@ -129,14 +138,18 @@ private Builder( long isPublished, long isAvailable, long isRealtime, - Map> segmentServerMap + Set servers, + RowSignature rowSignature, + long numRows ) { this.segmentId = segmentId; this.isPublished = isPublished; this.isAvailable = isAvailable; this.isRealtime = isRealtime; - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; + this.rowSignature = rowSignature; + this.numRows = numRows; } public Builder withRowSignature(RowSignature rowSignature) @@ -151,9 +164,9 @@ public Builder withNumRows(long numRows) return this; } - public Builder withReplicas(Map> segmentServerMap) + public Builder withReplicas(Set servers) { - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; return this; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index d0599f861903..e895113f8505 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -224,7 +224,7 @@ public Enumerable scan(DataContext root) Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); for (SegmentMetadataHolder h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = - new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows()); + new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows()); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 1b4e00819bb6..e707c40aeb5c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -27,6 +27,8 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -40,6 +42,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.DruidTable; @@ -63,6 +66,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; public class DruidSchemaTest extends CalciteTestBase { @@ -83,6 +87,8 @@ public class DruidSchemaTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private List druidServers; + @BeforeClass public static void setUpClass() { @@ -162,10 +168,12 @@ public void setUp() throws Exception index2 ); + final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments()); + druidServers = serverView.getDruidServers(); schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - new TestServerInventoryView(walker.getSegments()), + serverView, PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -237,4 +245,100 @@ public void testGetTableMapFoo2() Assert.assertEquals("m1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + + /** + * This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case + * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} + * is called more than once for same segment + */ + @Test + public void testSegmentMetadataHolderNumRows() + { + Map segmentsMetadata = schema.getSegmentMetadata(); + final Set segments = segmentsMetadata.keySet(); + Assert.assertEquals(3, segments.size()); + // find the only segment with datasource "foo2" + final DataSegment existingSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(existingSegment); + final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); + // update SegmentMetadataHolder of existingSegment with numRows=5 + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); + schema.setSegmentMetadataHolder(existingSegment, updatedHolder); + // find a druidServer holding existingSegment + final Pair pair = druidServers.stream() + .flatMap(druidServer -> druidServer.getSegments() + .stream() + .filter(segment -> segment + .equals( + existingSegment)) + .map(segment -> Pair + .of( + druidServer, + segment + ))) + .findAny() + .orElse(null); + Assert.assertNotNull(pair); + final ImmutableDruidServer server = pair.lhs; + Assert.assertNotNull(server); + final DruidServerMetadata druidServerMetadata = server.getMetadata(); + // invoke DruidSchema#addSegment on existingSegment + schema.addSegment(druidServerMetadata, existingSegment); + segmentsMetadata = schema.getSegmentMetadata(); + // get the only segment with datasource "foo2" + final DataSegment currentSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment); + Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId()); + Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); + // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before + Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas()); + Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable()); + Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished()); + } + + @Test + public void testNullDatasource() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + + @Test + public void testNullSegmentMetadataHolder() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + // remove one of the segments with datasource "foo" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo")) + .findFirst() + .orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 7d8cdaad5729..d354f141d9af 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -118,6 +118,11 @@ public class SystemSchemaTest extends CalciteTestBase CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0")) ); + private static final List ROWS3 = ImmutableList.of( + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "7.0", "dim3", ImmutableList.of("x"))), + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "8.0", "dim3", ImmutableList.of("xyz"))) + ); + private SystemSchema schema; private SpecificSegmentsQuerySegmentWalker walker; private DruidLeaderClient client; @@ -204,11 +209,22 @@ public Authorizer getAuthorizer(String name) ) .rows(ROWS2) .buildMMappedIndex(); + final QueryableIndex index3 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "3")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics(new LongSumAggregatorFactory("m1", "m1")) + .withRollup(false) + .build() + ) + .rows(ROWS3) + .buildMMappedIndex(); walker = new SpecificSegmentsQuerySegmentWalker(conglomerate) .add(segment1, index1) .add(segment2, index2) - .add(segment3, index2); + .add(segment3, index3); druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), @@ -469,7 +485,7 @@ public Object get(String name) 100L, 2L, //partition_num 1L, //num_replicas - 3L, //numRows + 2L, //numRows 0L, //is_published 1L, //is_available 0L //is_realtime @@ -481,7 +497,7 @@ public Object get(String name) 100L, 0L, //partition_num 1L, //num_replicas - 0L, //numRows = 3 + 0L, //numRows 0L, //is_published 1L, //is_available 1L //is_realtime diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 6718b1bd1f80..2dcc56959837 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -20,7 +20,9 @@ package org.apache.druid.sql.calcite.util; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -83,7 +86,14 @@ public TimelineLookup getTimeline(DataSource dataSource) @Override public List getDruidServers() { - throw new UnsupportedOperationException(); + final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); + final ImmutableDruidServer server = new ImmutableDruidServer( + DUMMY_SERVER, + 0L, + ImmutableMap.of("src", dataSource), + 1 + ); + return ImmutableList.of(server); } @Override