Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.druid.segment.join.table;

import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -52,7 +50,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -67,9 +64,13 @@ public class BroadcastSegmentIndexedTable implements IndexedTable
private final Set<String> keyColumns;
private final RowSignature rowSignature;
private final String version;
private final List<Map<Object, IntList>> keyColumnsIndex;
private final List<Index> keyColumnsIndexes;

public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, final Set<String> keyColumns, final String version)
public BroadcastSegmentIndexedTable(
final QueryableIndexSegment theSegment,
final Set<String> keyColumns,
final String version
)
{
this.keyColumns = keyColumns;
this.version = version;
Expand All @@ -92,19 +93,22 @@ public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, fina
}
this.rowSignature = sigBuilder.build();

// initialize keycolumn index maps
this.keyColumnsIndex = new ArrayList<>(rowSignature.size());
// initialize keycolumn index builders
final ArrayList<RowBasedIndexBuilder> indexBuilders = new ArrayList<>(rowSignature.size());
final List<String> keyColumnNames = new ArrayList<>(keyColumns.size());
for (int i = 0; i < rowSignature.size(); i++) {
final Map<Object, IntList> m;
final RowBasedIndexBuilder m;
final String columnName = rowSignature.getColumnName(i);
if (keyColumns.contains(columnName)) {
m = new HashMap<>();
final ValueType keyType =
rowSignature.getColumnType(i).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);

m = new RowBasedIndexBuilder(keyType);
keyColumnNames.add(columnName);
} else {
m = null;
}
keyColumnsIndex.add(m);
indexBuilders.add(m);
}

// sort of like the dump segment tool, but build key column indexes when reading the segment
Expand Down Expand Up @@ -143,12 +147,8 @@ public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, fina
for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < selectors.size(); keyColumnSelectorIndex++) {
final String keyColumnName = keyColumnNames.get(keyColumnSelectorIndex);
final int columnPosition = rowSignature.indexOf(keyColumnName);
final Map<Object, IntList> keyColumnValueIndex = keyColumnsIndex.get(columnPosition);
final Object key = selectors.get(keyColumnSelectorIndex).getObject();
if (key != null) {
final IntList array = keyColumnValueIndex.computeIfAbsent(key, k -> new IntArrayList());
array.add(rowNumber);
}
final RowBasedIndexBuilder keyColumnIndexBuilder = indexBuilders.get(columnPosition);
keyColumnIndexBuilder.add(selectors.get(keyColumnSelectorIndex).getObject());
}

if (rowNumber % 100_000 == 0) {
Expand All @@ -166,6 +166,11 @@ public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, fina
);

Integer totalRows = sequence.accumulate(0, (accumulated, in) -> accumulated += in);

this.keyColumnsIndexes = indexBuilders.stream()
.map(builder -> builder != null ? builder.build() : null)
.collect(Collectors.toList());

LOG.info("Created BroadcastSegmentIndexedTable with %s rows.", totalRows);
}

Expand Down Expand Up @@ -196,7 +201,7 @@ public int numRows()
@Override
public Index columnIndex(int column)
{
return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndex, rowSignature);
return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndexes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ReferenceCountedObject;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ReadableOffset;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -92,10 +93,37 @@ default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, b
*/
interface Index
{
int NOT_FOUND = -1;

/**
* Returns the natural key type for the index.
*/
ValueType keyType();

/**
* Returns whether keys are unique in this index. If this returns true, then {@link #find(Object)} will only ever
* return a zero- or one-element list.
*/
boolean areKeysUnique();

/**
* Returns the list of row numbers where the column this Reader is based on contains 'key'.
* Returns the list of row numbers corresponding to "key" in this index.
*
* If "key" is some type other than the natural type {@link #keyType()}, it will be converted before checking
* the index.
*/
IntList find(Object key);

/**
* Returns the row number corresponding to "key" in this index, or {@link #NOT_FOUND} if the key does not exist
* in the index.
*
* It is only valid to call this method if {@link #keyType()} is {@link ValueType#LONG} and {@link #areKeysUnique()}
* returns true.
*
* @throws UnsupportedOperationException if preconditions are not met
*/
int findUniqueLong(long key);
}

/**
Expand Down
Loading