Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -361,34 +363,43 @@ public IndexedInts getRow()
final int[][] dims = currEntry.getKey().getDims();

int[] indices = dimIndex < dims.length ? dims[dimIndex] : null;
if (indices == null) {
indices = new int[0];
}
// check for null entry
if (indices.length == 0 && dimValLookup.contains(null)) {
indices = new int[] { dimValLookup.getId(null) };
}

final int[] vals = indices;
List<Integer> valsTmp = null;
if ((indices == null || indices.length == 0) && dimValLookup.contains(null)) {
int id = dimValLookup.getId(null);
if (id < maxId) {
valsTmp = new ArrayList<>(1);
valsTmp.add(id);
}
} else if (indices != null && indices.length > 0) {
valsTmp = new ArrayList<>(indices.length);
for (int i = 0; i < indices.length; i++) {
int id = indices[i];
if (id < maxId) {
valsTmp.add(id);
}
}
}

final List<Integer> vals = valsTmp == null ? Collections.EMPTY_LIST : valsTmp;
return new IndexedInts()
{
@Override
public int size()
{
return vals.length;
return vals.size();
}

@Override
public int get(int index)
{
return vals[index];
return vals.get(index);
}

@Override
public Iterator<Integer> iterator()
{
return Ints.asList(vals).iterator();
return vals.iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.segment.incremental;

import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -46,6 +47,8 @@
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -54,6 +57,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -376,4 +380,76 @@ public void testFilterByNull() throws Exception
MapBasedRow row = (MapBasedRow) results.get(0);
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
}

@Test
public void testCursoringAndIndexUpdationInterleaving() throws Exception
{
final IncrementalIndex index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();

for (int i = 0; i < 2; i++) {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v1" + i)
)
);
}

final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);

Sequence<Cursor> cursors = sa.makeCursors(
null, new Interval(timestamp - 60_000, timestamp + 60_000), QueryGranularity.ALL, false
);

Sequences.toList(
Sequences.map(
cursors,
new Function<Cursor, Object>()
{
@Nullable
@Override
public Object apply(Cursor cursor)
{
DimensionSelector dimSelector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
int cardinality = dimSelector.getValueCardinality();

//index gets more rows at this point, while other thread is iterating over the cursor
try {
for (int i = 0; i < 1; i++) {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v2" + i)
)
);
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}

// and then, cursoring continues in the other thread
while (!cursor.isDone()) {
IndexedInts row = dimSelector.getRow();
for (int i : row) {
Assert.assertTrue(i < cardinality);
}
cursor.advance();
}

return null;
}
}
),
new ArrayList<>()
);
}
}