Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ public T next()
}

while (!pQueue.isEmpty() && Objects.equals(value, pQueue.peek().peek())) {
pQueue.remove();
PeekingIterator<T> same = pQueue.remove();
same.next();
if (same.hasNext()) {
pQueue.add(same);
}
}
counter++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.nested;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrays;
Expand Down Expand Up @@ -52,7 +53,7 @@
* for all literal writers, which for this type of writer entails building a local dictionary to map into to the global
* dictionary ({@link #localDictionary}) and writes this unsorted localId to an intermediate integer column,
* {@link #intermediateValueWriter}.
*
* <p>
* When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the
* local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
* the unsorted local ids with the sorted ids and writing to the compressed id column writer
Expand Down Expand Up @@ -133,8 +134,15 @@ public void addValue(int row, Object val) throws IOException
fillNull(row);
}
final T value = processValue(val);
final int globalId = lookupGlobalId(value);
final int localId = localDictionary.add(globalId);
final int localId;
// null is always 0
if (value == null) {
localId = localDictionary.add(0);
} else {
final int globalId = lookupGlobalId(value);
Preconditions.checkArgument(globalId >= 0, "Value [%s] is not present in global dictionary", value);
localId = localDictionary.add(globalId);
}
Comment on lines +139 to +145
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not okay to do this check inside of lookupGlobalId?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the shared function, lookupGlobalId has different implementations depending on the type of writer so would need to duplicate there

intermediateValueWriter.write(localId);
writeValue(value);
cursorPosition++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ public class GlobalDictionaryIdLookup
public GlobalDictionaryIdLookup()
{
this.stringLookup = new Object2IntLinkedOpenHashMap<>();
stringLookup.defaultReturnValue(-1);
this.longLookup = new Long2IntLinkedOpenHashMap();
longLookup.defaultReturnValue(-1);
this.doubleLookup = new Double2IntLinkedOpenHashMap();
doubleLookup.defaultReturnValue(-1);
}

public void addString(@Nullable String value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,12 @@ public void serializeStringDictionary(Iterable<String> dictionaryValues) throws
dictionaryWriter.write(null);
globalDictionaryIdLookup.addString(null);
for (String value : dictionaryValues) {
if (NullHandling.emptyToNullIfNeeded(value) == null) {
value = NullHandling.emptyToNullIfNeeded(value);
if (value == null) {
continue;
}

dictionaryWriter.write(value);
value = NullHandling.emptyToNullIfNeeded(value);
globalDictionaryIdLookup.addString(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ public String lookupName(int id)
return StringUtils.fromUtf8Nullable(globalDictionary.get(globalId));
} else if (globalId < globalDictionary.size() + globalLongDictionary.size()) {
return String.valueOf(globalLongDictionary.get(globalId - adjustLongId));
} else {
} else if (globalId < globalDictionary.size() + globalLongDictionary.size() + globalDoubleDictionary.size()) {
return String.valueOf(globalDoubleDictionary.get(globalId - adjustDoubleId));
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.nested;

import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand All @@ -45,7 +46,10 @@ public StringFieldColumnWriter(
@Override
String processValue(Object value)
{
return String.valueOf(value);
if (value == null) {
return null;
}
return NullHandling.emptyToNullIfNeeded(String.valueOf(value));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
Expand All @@ -42,11 +43,15 @@
import org.apache.druid.timeline.SegmentId;
import org.junit.rules.TemporaryFolder;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class NestedDataTestUtils
Expand All @@ -58,6 +63,9 @@ public class NestedDataTestUtils
public static final String SIMPLE_PARSER_TSV_TRANSFORM_FILE = "simple-nested-test-data-tsv-transform.json";
public static final String SIMPLE_AGG_FILE = "simple-nested-test-data-aggs.json";

public static final String TYPES_DATA_FILE = "types-test-data.json";
public static final String TYPES_PARSER_FILE = "types-test-data-parser.json";

public static final String NUMERIC_DATA_FILE = "numeric-nested-test-data.json";
public static final String NUMERIC_PARSER_FILE = "numeric-nested-test-data-parser.json";

Expand Down Expand Up @@ -160,6 +168,7 @@ public static List<Segment> createSegments(
maxRowCount,
rollup
);
inputDataStream.close();

final List<Segment> segments = Lists.transform(
ImmutableList.of(segmentDir),
Expand Down Expand Up @@ -207,6 +216,7 @@ public static List<Segment> createSegments(
maxRowCount,
rollup
);
inputDataStream.close();

final List<Segment> segments = Lists.transform(
ImmutableList.of(segmentDir),
Expand All @@ -223,6 +233,96 @@ public static List<Segment> createSegments(
return segments;
}

public static List<Segment> createSegmentsWithConcatenatedInput(
AggregationTestHelper helper,
TemporaryFolder tempFolder,
Closer closer,
Granularity granularity,
boolean rollup,
int maxRowCount,
int numCopies,
int numSegments
) throws Exception
{
return createSegmentsWithConcatenatedInput(
helper,
tempFolder,
closer,
SIMPLE_DATA_FILE,
SIMPLE_PARSER_FILE,
null,
SIMPLE_AGG_FILE,
granularity,
rollup,
maxRowCount,
numCopies,
numSegments
);
}

/**
* turn small test data into bigger test data by duplicating itself into a bigger stream
*/
public static List<Segment> createSegmentsWithConcatenatedInput(
AggregationTestHelper helper,
TemporaryFolder tempFolder,
Closer closer,
String inputFileName,
String parserJsonFileName,
String transformSpecJsonFileName,
String aggJsonFileName,
Granularity granularity,
boolean rollup,
int maxRowCount,
int numCopies,
int numSegments
) throws Exception
{
String parserJson = readFileFromClasspathAsString(parserJsonFileName);
String transformSpecJson = transformSpecJsonFileName != null ? readFileFromClasspathAsString(transformSpecJsonFileName) : null;
String aggJson = readFileFromClasspathAsString(aggJsonFileName);

List<File> segmentDirs = Lists.newArrayListWithCapacity(numSegments);
for (int i = 0; i < numSegments; i++) {
List<InputStream> inputStreams = Lists.newArrayListWithCapacity(numCopies);
for (int j = 0; j < numCopies; j++) {
inputStreams.add(new FileInputStream(readFileFromClasspath(inputFileName)));
Comment thread Fixed
if (j + 1 < numCopies) {
inputStreams.add(new ByteArrayInputStream(StringUtils.toUtf8("\n")));
}
}
SequenceInputStream inputDataStream = new SequenceInputStream(Collections.enumeration(inputStreams));
File segmentDir = tempFolder.newFolder();
helper.createIndex(
inputDataStream,
parserJson,
transformSpecJson,
aggJson,
segmentDir,
0,
granularity,
maxRowCount,
rollup
);
inputDataStream.close();
segmentDirs.add(segmentDir);
}

final List<Segment> segments = Lists.transform(
segmentDirs,
dir -> {
try {
return closer.register(new QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy("")));
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}
);

return segments;
}

public static Segment createIncrementalIndex(
String inputFileName,
String parserJsonFileName,
Expand Down Expand Up @@ -258,6 +358,7 @@ public static Segment createIncrementalIndex(
maxRowCount,
rollup
);
inputDataStream.close();
return new IncrementalIndexSegment(index, SegmentId.dummy("test_datasource"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void testNumericEvolutionFiltering(boolean doVectorize)

// Only nonexistent(4)
Assert.assertEquals(
timeseriesResult(TestHelper.createExpectedMap(
timeseriesResult(TestHelper.makeMap(
"a",
NullHandling.defaultLongValue(),
"b",
Expand Down
Loading