Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand Down Expand Up @@ -775,6 +776,59 @@ public void testAutoCompactionDutyWithFilter() throws Exception
}
}

@Test
public void testAutoCompactionDutyWithOverlappingInterval() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.NONE, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
// Create WEEK segment with 2013-08-26 to 2013-09-02
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.NONE, false, ImmutableList.of(new Interval("2013-09-01/2013-09-02", chrono))));
// Create MONTH segment with 2013-09-01 to 2013-10-01
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);

try (final Closeable ignored = unloader(fullDatasourceName)) {
verifySegmentsCount(2);

// Result is not rollup
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
Map<String, Object> expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%",
2,
"%%EXPECTED_SCAN_RESULT%%",
ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
null,
null,
null,
false
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

// Compact the WEEK segment
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

// Verify all task succeed
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
for (TaskResponseObject taskResponseObject : compactTasksBefore) {
Assert.assertEquals(TaskState.SUCCESS, taskResponseObject.getStatus());
}

// Verify compacted segments does not get compacted again
forceTriggerAutoCompaction(2);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}

private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

Expand Down Expand Up @@ -51,7 +50,7 @@ public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segmen
JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
),
SegmentUtils.hashIds(segments)
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
Expand Down Expand Up @@ -343,18 +345,37 @@ private CoordinatorStats doRun(
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());

final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);

// Create granularitySpec to send to compaction task
ClientCompactionTaskGranularitySpec granularitySpec;
if (config.getGranularitySpec() != null) {
granularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup()

);
Granularity segmentGranularityToUse = null;
if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
// Determines segmentGranularity from the segmentsToCompact
// Each batch of segmentToCompact from CompactionSegmentIterator will contains the same interval as
// segmentGranularity is not set in the compaction config
Interval interval = segmentsToCompact.get(0).getInterval();
if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) {
try {
segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
}
catch (IAE iae) {
// This case can happen if the existing segment interval result in complicated periods.
// Fall back to setting segmentGranularity as null
LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval);
}
} else {
LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
}
} else {
granularitySpec = null;
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
}
granularitySpec = new ClientCompactionTaskGranularitySpec(
segmentGranularityToUse,
config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null,
config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null

);

// Create dimensionsSpec to send to compaction task
ClientCompactionTaskDimensionsSpec dimensionsSpec;
if (config.getDimensionsSpec() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -48,6 +53,7 @@ public class HttpIndexingServiceClientTest
private HttpIndexingServiceClient httpIndexingServiceClient;
private ObjectMapper jsonMapper;
private DruidLeaderClient druidLeaderClient;
private ObjectMapper mockMapper;

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand All @@ -57,6 +63,8 @@ public void setup()
{
jsonMapper = new DefaultObjectMapper();
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
mockMapper = EasyMock.createMock(ObjectMapper.class);

httpIndexingServiceClient = new HttpIndexingServiceClient(
jsonMapper,
druidLeaderClient
Expand Down Expand Up @@ -268,5 +276,70 @@ public void testGetTaskReportEmpty() throws Exception
EasyMock.verify(druidLeaderClient, response);
}

@Test
public void testCompact() throws Exception
{
DataSegment segment = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"),
null,
null,
NoneShardSpec.instance(),
0,
1
);
Capture captureTask = EasyMock.newCapture();
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);

StringFullResponseHolder responseHolder = new StringFullResponseHolder(
response,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa")));

EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task"))
.andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task")))
.anyTimes();
EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
.andReturn(responseHolder)
.anyTimes();
EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask)))
.andReturn(new byte[]{1, 2, 3})
.anyTimes();
EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)))
.andReturn(ImmutableMap.of())
.anyTimes();
EasyMock.replay(druidLeaderClient, mockMapper);

HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient(
mockMapper,
druidLeaderClient
);

try {
httpIndexingServiceClient.compactSegments(
"test-compact",
ImmutableList.of(segment),
50,
null,
null,
null,
null,
null,
null
);
}
catch (Exception e) {
// Ignore IllegalStateException as taskId is internally generated and returned task id will failed check
Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName());
}

ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}
}

Loading