From ccb03c00a7d0a01a750115d4751e685ec6bcbf67 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 19 Nov 2019 11:33:46 -0800 Subject: [PATCH 1/6] Optimize CachingLocalSegmentAllocator#getSequenceName Replace StringUtils#format with string addition to generate the sequence name for an interval and partition. This is faster because format uses a Matcher under the covers to replace the string format with the variables. --- .../common/task/CachingLocalSegmentAllocatorHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java index 1963fb4c2fdc..7c66cb3172e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java @@ -133,6 +133,6 @@ public String getSequenceName(Interval interval, InputRow inputRow) */ private String getSequenceName(Interval interval, ShardSpec shardSpec) { - return StringUtils.format("%s_%s_%d", taskId, interval, shardSpec.getPartitionNum()); + return taskId + "_" + interval + "_" + shardSpec.getPartitionNum(); } } From 7c2209585677ccb55f0f69ed253c9c410c3aaa03 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 19 Nov 2019 15:17:43 -0800 Subject: [PATCH 2/6] fix imports and add test --- .../CachingLocalSegmentAllocatorHelper.java | 1 - .../CachingLocalSegmentAllocatorTest.java | 130 ++++++++++++++++++ 2 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java index 7c66cb3172e3..00f951a8bcb1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.common.actions.SurrogateAction; import org.apache.druid.indexing.common.task.IndexTask.ShardSpecs; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java new file mode 100644 index 000000000000..5016af6f9974 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.LockListAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.partition.HashBasedNumberedShardSpecFactory; +import org.apache.druid.timeline.partition.ShardSpecFactory; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class CachingLocalSegmentAllocatorTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String TASK_ID = "TASK_ID"; + private static final String DATASOURCE = "DATASOURCE"; + private static final Interval INTERVAL = Intervals.utc(0, 1000); + private static final String VERSION = "version"; + private static final String DIMENSION = "dim"; + private static final List PARTITION_DIMENSIONS = ImmutableList.of(DIMENSION); + private static final int NUM_PARTITONS = 1; + private static final ShardSpecFactory SHARD_SPEC_FACTORY = new HashBasedNumberedShardSpecFactory( + PARTITION_DIMENSIONS, + NUM_PARTITONS + ); + private static final Map> ALLOCATE_SPEC = ImmutableMap.of( + INTERVAL, Pair.of(SHARD_SPEC_FACTORY, NUM_PARTITONS) + ); + + private InputRow row; + + private CachingLocalSegmentAllocator target; + + @Before + public void setUp() throws IOException + { + TaskToolbox toolbox = createToolbox(); + row = createInputRow(); + + target = new CachingLocalSegmentAllocator(toolbox, TASK_ID, DATASOURCE, ALLOCATE_SPEC); + } + + @Test + public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() + { + String sequenceName = target.getSequenceName(INTERVAL, row); + String expectedSequenceName = StringUtils.format("%s_%s_%d", TASK_ID, INTERVAL, 0); + Assert.assertEquals(expectedSequenceName, sequenceName); + } + + private static TaskToolbox createToolbox() + { + TaskToolbox toolbox = EasyMock.mock(TaskToolbox.class); + EasyMock.expect(toolbox.getTaskActionClient()).andStubReturn(createTaskActionClient()); + EasyMock.expect(toolbox.getObjectMapper()).andStubReturn(OBJECT_MAPPER); + EasyMock.replay(toolbox); + return toolbox; + } + + private static TaskActionClient createTaskActionClient() + { + List taskLocks = Collections.singletonList(createTaskLock()); + + try { + TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(EasyMock.anyObject(LockListAction.class))).andStubReturn(taskLocks); + EasyMock.replay(taskActionClient); + return taskActionClient; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static TaskLock createTaskLock() + { + TaskLock taskLock = EasyMock.mock(TaskLock.class); + EasyMock.expect(taskLock.getInterval()).andStubReturn(INTERVAL); + EasyMock.expect(taskLock.getVersion()).andStubReturn(VERSION); + EasyMock.replay(taskLock); + return taskLock; + } + + private static InputRow createInputRow() + { + long timestamp = INTERVAL.getStartMillis(); + InputRow inputRow = EasyMock.mock(InputRow.class); + EasyMock.expect(inputRow.getTimestamp()).andStubReturn(DateTimes.utc(timestamp)); + EasyMock.expect(inputRow.getTimestampFromEpoch()).andStubReturn(timestamp); + EasyMock.expect(inputRow.getDimension(DIMENSION)).andStubReturn(Collections.singletonList(DIMENSION)); + EasyMock.replay(inputRow); + return inputRow; + } +} From c04f68985a172e85132044867afecc09a4638b41 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 19 Nov 2019 15:28:45 -0800 Subject: [PATCH 3/6] Add comment about optimization --- .../common/task/CachingLocalSegmentAllocatorHelper.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java index 00f951a8bcb1..3c9bab2d9f5c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorHelper.java @@ -132,6 +132,8 @@ public String getSequenceName(Interval interval, InputRow inputRow) */ private String getSequenceName(Interval interval, ShardSpec shardSpec) { + // Note: We do not use String format here since this can be called in a tight loop + // and it's faster to add strings together than it is to use String#format return taskId + "_" + interval + "_" + shardSpec.getPartitionNum(); } } From 82a2c86519d74b57f25b8d82568ab1a6660cf75a Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 19 Nov 2019 17:15:05 -0800 Subject: [PATCH 4/6] Use renamed function for TaskToolbox --- .../indexing/common/task/CachingLocalSegmentAllocatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java index 5016af6f9974..d01e47d4c34c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java @@ -88,7 +88,7 @@ private static TaskToolbox createToolbox() { TaskToolbox toolbox = EasyMock.mock(TaskToolbox.class); EasyMock.expect(toolbox.getTaskActionClient()).andStubReturn(createTaskActionClient()); - EasyMock.expect(toolbox.getObjectMapper()).andStubReturn(OBJECT_MAPPER); + EasyMock.expect(toolbox.getJsonMapper()).andStubReturn(OBJECT_MAPPER); EasyMock.replay(toolbox); return toolbox; } From 9d29652f875c10867419ef3fe596bce913f99496 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Mon, 16 Dec 2019 17:13:57 -0800 Subject: [PATCH 5/6] Move tests after refactor --- .../CachingLocalSegmentAllocatorTest.java | 130 ------------------ ...itionCachingLocalSegmentAllocatorTest.java | 11 ++ ...itionCachingLocalSegmentAllocatorTest.java | 11 ++ 3 files changed, 22 insertions(+), 130 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java deleted file mode 100644 index d01e47d4c34c..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocatorTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.timeline.partition.HashBasedNumberedShardSpecFactory; -import org.apache.druid.timeline.partition.ShardSpecFactory; -import org.easymock.EasyMock; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class CachingLocalSegmentAllocatorTest -{ - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String TASK_ID = "TASK_ID"; - private static final String DATASOURCE = "DATASOURCE"; - private static final Interval INTERVAL = Intervals.utc(0, 1000); - private static final String VERSION = "version"; - private static final String DIMENSION = "dim"; - private static final List PARTITION_DIMENSIONS = ImmutableList.of(DIMENSION); - private static final int NUM_PARTITONS = 1; - private static final ShardSpecFactory SHARD_SPEC_FACTORY = new HashBasedNumberedShardSpecFactory( - PARTITION_DIMENSIONS, - NUM_PARTITONS - ); - private static final Map> ALLOCATE_SPEC = ImmutableMap.of( - INTERVAL, Pair.of(SHARD_SPEC_FACTORY, NUM_PARTITONS) - ); - - private InputRow row; - - private CachingLocalSegmentAllocator target; - - @Before - public void setUp() throws IOException - { - TaskToolbox toolbox = createToolbox(); - row = createInputRow(); - - target = new CachingLocalSegmentAllocator(toolbox, TASK_ID, DATASOURCE, ALLOCATE_SPEC); - } - - @Test - public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() - { - String sequenceName = target.getSequenceName(INTERVAL, row); - String expectedSequenceName = StringUtils.format("%s_%s_%d", TASK_ID, INTERVAL, 0); - Assert.assertEquals(expectedSequenceName, sequenceName); - } - - private static TaskToolbox createToolbox() - { - TaskToolbox toolbox = EasyMock.mock(TaskToolbox.class); - EasyMock.expect(toolbox.getTaskActionClient()).andStubReturn(createTaskActionClient()); - EasyMock.expect(toolbox.getJsonMapper()).andStubReturn(OBJECT_MAPPER); - EasyMock.replay(toolbox); - return toolbox; - } - - private static TaskActionClient createTaskActionClient() - { - List taskLocks = Collections.singletonList(createTaskLock()); - - try { - TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); - EasyMock.expect(taskActionClient.submit(EasyMock.anyObject(LockListAction.class))).andStubReturn(taskLocks); - EasyMock.replay(taskActionClient); - return taskActionClient; - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private static TaskLock createTaskLock() - { - TaskLock taskLock = EasyMock.mock(TaskLock.class); - EasyMock.expect(taskLock.getInterval()).andStubReturn(INTERVAL); - EasyMock.expect(taskLock.getVersion()).andStubReturn(VERSION); - EasyMock.replay(taskLock); - return taskLock; - } - - private static InputRow createInputRow() - { - long timestamp = INTERVAL.getStartMillis(); - InputRow inputRow = EasyMock.mock(InputRow.class); - EasyMock.expect(inputRow.getTimestamp()).andStubReturn(DateTimes.utc(timestamp)); - EasyMock.expect(inputRow.getTimestampFromEpoch()).andStubReturn(timestamp); - EasyMock.expect(inputRow.getDimension(DIMENSION)).andStubReturn(Collections.singletonList(DIMENSION)); - EasyMock.replay(inputRow); - return inputRow; - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java index 6e91d10066af..30925e32e7e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.distribution.PartitionBoundaries; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; @@ -141,6 +142,16 @@ public void allocatesCorrectShardSpecsForLastPartition() testAllocate(row, interval, partitionNum, null); } + @Test + public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() + { + Interval interval = INTERVAL_NORMAL; + InputRow row = createInputRow(interval, PARTITION9); + String sequenceName = target.getSequenceName(interval, row); + String expectedSequenceName = StringUtils.format("%s_%s_%d", TASKID, interval, 1); + Assert.assertEquals(expectedSequenceName, sequenceName); + } + @SuppressWarnings("SameParameterValue") private void testAllocate(InputRow row, Interval interval, int partitionNum) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java index e82101d7386a..2a868c858273 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -101,6 +102,16 @@ public void allocatesCorrectShardSpec() throws IOException Assert.assertEquals(PARTITION_NUM, shardSpec.getPartitionNum()); } + + @Test + public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() + { + InputRow row = createInputRow(); + String sequenceName = target.getSequenceName(INTERVAL, row); + String expectedSequenceName = StringUtils.format("%s_%s_%d", TASKID, INTERVAL, PARTITION_NUM); + Assert.assertEquals(expectedSequenceName, sequenceName); + } + private static TaskToolbox createToolbox() { TaskToolbox toolbox = EasyMock.mock(TaskToolbox.class); From 38c604e87cc78b47fa908857873e9125aa198034 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 18 Dec 2019 09:40:40 -0800 Subject: [PATCH 6/6] Rename tests --- .../task/RangePartitionCachingLocalSegmentAllocatorTest.java | 3 ++- .../HashPartitionCachingLocalSegmentAllocatorTest.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java index 30925e32e7e3..dad9893a6b04 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java @@ -143,8 +143,9 @@ public void allocatesCorrectShardSpecsForLastPartition() } @Test - public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() + public void getSequenceName() { + // getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow Interval interval = INTERVAL_NORMAL; InputRow row = createInputRow(interval, PARTITION9); String sequenceName = target.getSequenceName(interval, row); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java index 2a868c858273..6230c8a888c6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java @@ -104,8 +104,9 @@ public void allocatesCorrectShardSpec() throws IOException @Test - public void test_getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow() + public void getSequenceName() { + // getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow InputRow row = createInputRow(); String sequenceName = target.getSequenceName(INTERVAL, row); String expectedSequenceName = StringUtils.format("%s_%s_%d", TASKID, INTERVAL, PARTITION_NUM);