From ee9f8852d52d73566d0f45f23629e314eca2c550 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 17 Oct 2019 17:30:22 +0800 Subject: [PATCH] #8690 use utc interval when create pedding segments --- .../common/actions/SegmentAllocateAction.java | 3 +- .../actions/SegmentAllocateActionTest.java | 29 +++++++++++++++++++ .../IndexerSQLMetadataStorageCoordinator.java | 6 ++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 678144999e5c..dc5335e1b90a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -42,6 +42,7 @@ import org.apache.druid.timeline.partition.ShardSpecFactory; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import javax.annotation.Nullable; import java.util.HashSet; @@ -187,7 +188,7 @@ public SegmentIdWithShardSpec perform( // 1) if something overlaps our timestamp, use that // 2) otherwise try preferredSegmentGranularity & going progressively smaller - final Interval rowInterval = queryGranularity.bucket(timestamp); + final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC()); final Set usedSegmentsForRow = new HashSet<>( msc.getUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 7187bdbf07e3..b35bb5d852b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -49,6 +50,7 @@ import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -980,6 +982,33 @@ public void testWithShardSpecFactoryAndOvershadowingSegments() throws IOExceptio Assert.assertEquals(ImmutableList.of("dim1"), hashBasedNumberedShardSpec.getPartitionDimensions()); } + @Test + public void testSameIntervalWithSegmentGranularity() + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + Granularity segmentGranularity = new PeriodGranularity(Period.hours(1), null, DateTimes.inferTzFromString("Asia/Shanghai")); + + final SegmentIdWithShardSpec id1 = allocate( + task, + PARTY_TIME, + Granularities.MINUTE, + segmentGranularity, + "s1", + null + ); + final SegmentIdWithShardSpec id2 = allocate( + task, + PARTY_TIME, + Granularities.MINUTE, + segmentGranularity, + "s2", + null + ); + Assert.assertNotNull(id1); + Assert.assertNotNull(id2); + } + private SegmentIdWithShardSpec allocate( final Task task, final DateTime timestamp, diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ad6221330508..3b878e0da083 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -52,6 +52,7 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpecFactory; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -436,6 +437,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "version"); + Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( handle -> { @@ -444,7 +446,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( handle, dataSource, sequenceName, - interval, + allocateInterval, shardSpecFactory, maxVersion ); @@ -454,7 +456,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( dataSource, sequenceName, previousSegmentId, - interval, + allocateInterval, shardSpecFactory, maxVersion );