Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,29 @@ public void testCompactionWithQueryGranularityInGranularitySpec() throws Excepti
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
// QueryGranularity was SECOND, now we will change it to HOUR
// QueryGranularity was SECOND, now we will change it to HOUR (QueryGranularity changed to coarser)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR);

// The original 4 segments should be compacted into 2 new segments
// The original 4 segments should be compacted into 2 new segments since data only has 2 days and the compaction
// segmentGranularity is DAY
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);

// QueryGranularity was HOUR, now we will change it to MINUTE (QueryGranularity changed to finer)
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.MINUTE);

// There will be no change in number of segments as compaction segmentGranularity is the same and data interval
// is the same. Since QueryGranularity is changed to finer qranularity, the data will remains the same. (data
// will just be bucketed to a finer qranularity but roll up will not be different
// i.e. 2020-10-29T05:00 will just be bucketed to 2020-10-29T05:00:00)
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.MINUTE.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;

import java.util.Objects;

/**
* Spec containing Granularity configs for Compaction Task.
* This class mimics JSON field names for fields supported in compaction task with
* the corresponding fields in {@link GranularitySpec}.
* This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
* Granularity configs for Compaction task as they would for any other ingestion task.
* Note that this class is not the same as {@link GranularitySpec}. This class simply holds Granularity configs
* and use it to generate index task specs (Compaction task internally creates index task).
* This class does not do bucketing, group events or knows how to partition data.
*/
public class ClientCompactionTaskGranularitySpec
{
private final Granularity segmentGranularity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,11 @@ public AppenderatorAddResult add(
// persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Flushing in-memory data to disk because %s.", String.join(",", persistReasons));

long bytesPersisted = 0L;
long bytesToBePersisted = 0L;
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
final Sink sinkEntry = entry.getValue();
if (sinkEntry != null) {
bytesPersisted += sinkEntry.getBytesInMemory();
bytesToBePersisted += sinkEntry.getBytesInMemory();
if (sinkEntry.swappable()) {
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
Expand All @@ -347,7 +347,7 @@ public AppenderatorAddResult add(
}
}

if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesPersisted > maxBytesTuningConfig) {
if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesToBePersisted > maxBytesTuningConfig) {
// We are still over maxBytesTuningConfig even after persisting.
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,19 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;

import java.util.Objects;

/**
* Spec containing Granularity configs for Auto Compaction.
* This class mimics JSON field names for fields supported in auto compaction with
* the corresponding fields in {@link GranularitySpec}.
* This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
* Granularity configs for Auto Compaction as they would for any other ingestion task.
* Note that this class is not the same as {@link GranularitySpec}. This class simply holds Granularity configs
* and pass it to compaction task spec. This class does not do bucketing, group events or knows how to partition data.
*/
public class UserCompactionTaskGranularityConfig
{
private final Granularity segmentGranularity;
Expand Down