Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fd20a2f
Add SegmentAllocationQueue to batch allocation actions
kfaraz Nov 15, 2022
54fa8cc
Retain old code in SegmentAllocateAction
kfaraz Nov 15, 2022
401e04a
Add tests, fix concurrency bugs
kfaraz Nov 16, 2022
8882596
Fix bugs, update test
kfaraz Nov 16, 2022
da36e53
Fix tests and bugs
kfaraz Nov 16, 2022
d4de20b
Fix bug
kfaraz Nov 16, 2022
8f45a9c
Put new flow behind a feature flag
kfaraz Nov 16, 2022
ab26658
Fix forbidden api usage
kfaraz Nov 16, 2022
5c687ce
Remove forbidden API usage
kfaraz Nov 16, 2022
a9574f5
Fix compilation error due to GuardedBy
kfaraz Nov 16, 2022
0524e83
Revert suspicious indentation
kfaraz Nov 16, 2022
22ca1f3
Fix problematic use of leaderSelector.registerListener
kfaraz Nov 17, 2022
bdad64d
Schedule queue processing only if batching is enabled
kfaraz Nov 17, 2022
b546f6c
Remove unused method
kfaraz Nov 17, 2022
ea0b96f
Batch requests of same task group, same allocation interval
kfaraz Nov 17, 2022
f6b8f77
Add failed metric
kfaraz Nov 17, 2022
c5862e7
Fix possible concurrent modification bug
kfaraz Nov 17, 2022
4c66afd
Fix interval search
kfaraz Nov 17, 2022
e07e852
Deduplicate requests while creating new segments and updating metadat…
kfaraz Nov 18, 2022
829628c
Mark success and failure correctly
kfaraz Nov 18, 2022
3a85568
Minor fixes
kfaraz Nov 18, 2022
b6da3c4
Limit queue size
kfaraz Nov 21, 2022
1d7a688
Add IntervalsTest and test for IndexerSQLMSC
kfaraz Nov 23, 2022
ef44738
Merge branch 'master' of github.com:apache/druid into seg_alloc_queue
kfaraz Nov 29, 2022
55676cc
Add tests, fix success bug
kfaraz Nov 29, 2022
39a97fd
Improve leadership check
kfaraz Nov 29, 2022
5e0cdca
Add more tests
kfaraz Nov 29, 2022
0ed6562
Fix checkstyle
kfaraz Nov 29, 2022
92479da
Add TaskAction.canPerformAsync()
kfaraz Nov 29, 2022
295e96d
Fix interval search
kfaraz Nov 29, 2022
a03ba2e
Merge branch 'master' of github.com:apache/druid into seg_alloc_queue
kfaraz Nov 30, 2022
0d09a6e
Fix behaviour on being elected leader
kfaraz Dec 1, 2022
c132748
Fix tests
kfaraz Dec 2, 2022
d472152
Keep keys in queue rather than the batch
kfaraz Dec 2, 2022
29bc0d4
Set default batchAllocationMaxWaitTime to 500ms
kfaraz Dec 2, 2022
784e6d2
Set batchAllocation to true for testing
kfaraz Dec 2, 2022
602c39e
Reduce max wait time to 50ms
kfaraz Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package org.apache.druid.java.util.common;

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;

import javax.annotation.Nullable;

public final class Intervals
{
public static final Interval ETERNITY = utc(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT);
Expand Down Expand Up @@ -68,6 +71,32 @@ public static boolean isEternity(final Interval interval)
return ETERNITY.equals(interval);
}

/**
* Finds an interval from the given set of sortedIntervals which overlaps with
* the searchInterval. If multiple candidate intervals overlap with the
* searchInterval, the "smallest" interval based on the
* {@link Comparators#intervalsByStartThenEnd()} is returned.
*
* @param searchInterval Interval which should overlap with the result
* @param sortedIntervals Candidate overlapping intervals, sorted in ascending
* order, using {@link Comparators#intervalsByStartThenEnd()}.
* @return The first overlapping interval, if one exists, otherwise null.
*/
@Nullable
public static Interval findOverlappingInterval(Interval searchInterval, Interval[] sortedIntervals)
{
for (Interval interval : sortedIntervals) {
if (interval.overlaps(searchInterval)) {
return interval;
} else if (interval.getStart().isAfter(searchInterval.getEnd())) {
// Intervals after this cannot have an overlap
return null;
}
}

return null;
}

private Intervals()
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common;

import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;

public class IntervalsTest
{

@Test
public void testFindOverlappingInterval()
{
final Interval[] sortedIntervals = new Interval[]{
Intervals.of("2019/2020"),
Intervals.of("2021/2022"),
Intervals.of("2021-04-01/2021-05-01"),
Intervals.of("2022/2023")
};
Arrays.sort(sortedIntervals, Comparators.intervalsByStartThenEnd());

// Search interval outside the bounds of the sorted intervals
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2018/2019"), sortedIntervals)
);
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2023/2024"), sortedIntervals)
);

// Search interval within bounds, overlap exists
// Fully overlapping interval
Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021/2022"), sortedIntervals)
);

// Partially overlapping interval
Assert.assertEquals(
Intervals.of("2022/2023"),
Intervals.findOverlappingInterval(Intervals.of("2022-01-01/2022-01-02"), sortedIntervals)
);

Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021-06-01/2021-07-01"), sortedIntervals)
);

// Overlap with multiple intervals, "smallest" one is returned
Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021-03-01/2021-04-01"), sortedIntervals)
);

// Search interval within bounds, no overlap
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2020-01-02/2020-03-03"), sortedIntervals)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.indexing.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -329,14 +328,7 @@ public void publishSegments(Iterable<DataSegment> segments) throws IOException
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
DataSegment::getInterval
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class LocalTaskActionClient implements TaskActionClient
{
Expand Down Expand Up @@ -76,11 +77,28 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction)
}

final long performStartTime = System.currentTimeMillis();
final RetType result = taskAction.perform(task, toolbox);
final RetType result = performAction(taskAction);
emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime);
return result;
}

private <R> R performAction(TaskAction<R> taskAction)
{
try {
final R result;
if (taskAction.canPerformAsync(task, toolbox)) {
result = taskAction.performAsync(task, toolbox).get(5, TimeUnit.MINUTES);
} else {
result = taskAction.perform(task, toolbox);
}

return result;
}
catch (Throwable t) {
throw new RuntimeException(t);
}
}

private void emitTimerMetric(final String metric, final TaskAction<?> action, final long time)
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -180,6 +181,23 @@ public TypeReference<SegmentIdWithShardSpec> getReturnTypeReference()
};
}

@Override
public boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return toolbox.canBatchSegmentAllocation();
}

@Override
public Future<SegmentIdWithShardSpec> performAsync(Task task, TaskActionToolbox toolbox)
{
if (!toolbox.canBatchSegmentAllocation()) {
throw new ISE("Batched segment allocation is disabled");
}
return toolbox.getSegmentAllocationQueue().add(
new SegmentAllocateRequest(task, this, MAX_ATTEMPTS)
);
}

@Override
public SegmentIdWithShardSpec perform(
final Task task,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import org.apache.druid.indexing.common.task.Task;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;

/**
* Request received by the overlord for segment allocation.
*/
public class SegmentAllocateRequest
{
private final Task task;
private final SegmentAllocateAction action;
private final int maxAttempts;
private final Interval rowInterval;

private int attempts;

public SegmentAllocateRequest(Task task, SegmentAllocateAction action, int maxAttempts)
{
this.task = task;
this.action = action;
this.maxAttempts = maxAttempts;
this.rowInterval = action.getQueryGranularity()
.bucket(action.getTimestamp())
.withChronology(ISOChronology.getInstanceUTC());
}

public Task getTask()
{
return task;
}

public SegmentAllocateAction getAction()
{
return action;
}

public void incrementAttempts()
{
++attempts;
}

public boolean canRetry()
{
return attempts < maxAttempts;
}

public int getAttempts()
{
return attempts;
}

public Interval getRowInterval()
{
return rowInterval;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

public class SegmentAllocateResult
{
private final SegmentIdWithShardSpec segmentId;
private final String errorMessage;

public SegmentAllocateResult(SegmentIdWithShardSpec segmentId, String errorMessage)
{
this.segmentId = segmentId;
this.errorMessage = errorMessage;
}

public SegmentIdWithShardSpec getSegmentId()
{
return segmentId;
}

public String getErrorMessage()
{
return errorMessage;
}

public boolean isSuccess()
{
return segmentId != null;
}
}
Loading