Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ad5441e
First set of changes for framework
somu-imply Sep 13, 2022
2c4ba0c
Second set of changes to move segment map function to data source
somu-imply Sep 13, 2022
311e252
Minot change to server manager
somu-imply Sep 14, 2022
fdf4829
Removing the createSegmentMapFunction from JoinableFactoryWrapper and…
somu-imply Sep 14, 2022
16fb8d9
Checkstyle fixes
somu-imply Sep 15, 2022
c078ce8
Patching Eric's fix for injection
somu-imply Sep 20, 2022
29fdea6
Checkstyle and fixing some CI issues
somu-imply Sep 22, 2022
f77be1b
Fixing code inspections and some failed tests and one injector for te…
somu-imply Sep 22, 2022
21405f1
Another set of changes for CI...almost there
somu-imply Sep 22, 2022
abe4973
Equals and hashcode part update
somu-imply Sep 22, 2022
149599d
Fixing injector from Eric + refactoring for broadcastJoinHelper
somu-imply Sep 27, 2022
63313f4
Updating second injector. Might revert later if better way found
somu-imply Sep 27, 2022
557081a
Fixing guice issue in JoinableFactory
somu-imply Sep 30, 2022
1b75daf
Addressing review comments part 1
somu-imply Oct 2, 2022
9da42a9
Temp changes refactoring
somu-imply Oct 3, 2022
4022054
Revert "Temp changes refactoring"
somu-imply Oct 3, 2022
e7bf434
temp
somu-imply Oct 6, 2022
af696a3
Temp discussions
somu-imply Oct 6, 2022
79d2db3
Refactoring temp
somu-imply Oct 11, 2022
38306f6
Refatoring the query rewrite to refer to a datasource
somu-imply Oct 12, 2022
a10fa76
Refactoring getCacheKey by moving it inside data source
somu-imply Oct 13, 2022
eaf5c4b
Nullable annotation check in injector
somu-imply Oct 14, 2022
1763b51
Addressing some comments, removing 2 analysis.isJoin() checks and cor…
somu-imply Oct 14, 2022
a8291f1
Minor changes for refactoring
somu-imply Oct 17, 2022
656934b
Addressing reviews part 1
somu-imply Oct 18, 2022
899add0
Refactoring part 2 with new test cases for broadcast join
somu-imply Oct 18, 2022
602c45a
Merge remote-tracking branch 'upstream/master' into unnest_v1
somu-imply Oct 18, 2022
0474963
Set for nullables
somu-imply Oct 19, 2022
e45e07e
removing instance of checks
somu-imply Oct 21, 2022
bef8349
Storing nullables in guice to avoid checking on reruns
somu-imply Oct 21, 2022
01a5b22
Merge remote-tracking branch 'upstream/master' into unnest_v1
somu-imply Oct 22, 2022
dd00ccd
Fixing a test case and removing an irrelevant line
somu-imply Oct 22, 2022
40d3724
Addressing the atomic reference review comments
somu-imply Oct 24, 2022
c50bb90
Merge remote-tracking branch 'upstream/master' into unnest_v1
somu-imply Oct 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.openjdk.jmh.infra.Blackhole;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -447,7 +446,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);
groupByQuery = GroupByQuery
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand Down Expand Up @@ -96,14 +95,12 @@ public ObjectMapper getSmileMapper(Injector injector)
return smileMapper;
}

@VisibleForTesting
public static void setupJackson(Injector injector, final ObjectMapper mapper)
{
mapper.setInjectableValues(new GuiceInjectableValues(injector));
setupAnnotationIntrospector(mapper, new GuiceAnnotationIntrospector());
}

@VisibleForTesting
public static void setupAnnotationIntrospector(
final ObjectMapper mapper,
final AnnotationIntrospector annotationIntrospector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.google.inject.ConfigurationException;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.java.util.common.IAE;

import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReference;

/**
*
*/
public class GuiceInjectableValues extends InjectableValues
{
private final Injector injector;
private final AtomicReference<HashSet<Key>> nullables;

public GuiceInjectableValues(Injector injector)
{
this.injector = injector;
this.nullables = new AtomicReference<>(new HashSet<>());
}

@Override
Expand All @@ -49,8 +57,22 @@ public Object findInjectableValue(
// whatever provider needs"
// Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
// great care
if (valueId instanceof Key) {
return injector.getInstance((Key) valueId);
if (nullables.get().contains((Key) valueId)) {
return null;
} else if (valueId instanceof Key) {
try {
return injector.getInstance((Key) valueId);
}
catch (ConfigurationException ce) {
// check if nullable annotation is present for this
if (forProperty.getAnnotation(Nullable.class) != null) {
HashSet<Key> encounteredNullables = new HashSet<>(nullables.get());
encounteredNullables.add((Key) valueId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-thread-safe mutation of the HashSet. The goal of the AtomicReference was to make thinds thread-safe. You must create a brand new Set, add all of the old values, add the new value and then set the new reference on the AtomicReference.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

nullables.set(encounteredNullables);
return null;
}
throw ce;
}
}
throw new IAE(
"Unknown class type [%s] for valueId [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
Expand Down Expand Up @@ -88,6 +89,58 @@ protected BaseLeafFrameProcessor(
this.broadcastJoinHelper = inputChannelsAndBroadcastJoinHelper.rhs;
}

/**
* Helper that enables implementations of {@link BaseLeafFrameProcessorFactory} to set up their primary and side channels.
*/
private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputChannelsAndBroadcastJoinHelper(
final DataSource dataSource,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

final List<ReadableFrameChannel> inputChannels = new ArrayList<>();
final BroadcastJoinHelper broadcastJoinHelper;

if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

if (baseInput.hasChannel()) {
// BroadcastJoinHelper doesn't need to read the base channel, so stub in a null reader.
channelReaders.add(null);
}

for (Int2ObjectMap.Entry<ReadableInput> sideChannelEntry : sideChannels.int2ObjectEntrySet()) {
final int inputNumber = sideChannelEntry.getIntKey();
inputNumberToProcessorChannelMap.put(inputNumber, inputChannels.size());
inputChannels.add(sideChannelEntry.getValue().getChannel());
channelReaders.add(sideChannelEntry.getValue().getChannelFrameReader());
}

broadcastJoinHelper = new BroadcastJoinHelper(
inputNumberToProcessorChannelMap,
inputChannels,
channelReaders,
joinableFactory,
memoryReservedForBroadcastJoin
);
} else {
broadcastJoinHelper = null;
}

return Pair.of(inputChannels, broadcastJoinHelper);
}

@Override
public List<ReadableFrameChannel> inputChannels()
{
Expand Down Expand Up @@ -146,71 +199,19 @@ protected SegmentReference mapSegment(final Segment segment)

private boolean initializeSegmentMapFn(final IntSet readableInputs)
{
final AtomicLong cpuAccumulator = new AtomicLong();
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
segmentMapFn = Function.identity();
return true;
} else {
final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);

DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
if (retVal) {
segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);
}

return retVal;
}
}

/**
* Helper that enables implementations of {@link BaseLeafFrameProcessorFactory} to set up their primary and side channels.
*/
private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputChannelsAndBroadcastJoinHelper(
final DataSource dataSource,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

final List<ReadableFrameChannel> inputChannels = new ArrayList<>();
final BroadcastJoinHelper broadcastJoinHelper;

if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

if (baseInput.hasChannel()) {
// BroadcastJoinHelper doesn't need to read the base channel, so stub in a null reader.
channelReaders.add(null);
}

for (Int2ObjectMap.Entry<ReadableInput> sideChannelEntry : sideChannels.int2ObjectEntrySet()) {
final int inputNumber = sideChannelEntry.getIntKey();
inputNumberToProcessorChannelMap.put(inputNumber, inputChannels.size());
inputChannels.add(sideChannelEntry.getValue().getChannel());
channelReaders.add(sideChannelEntry.getValue().getChannelFrameReader());
}

broadcastJoinHelper = new BroadcastJoinHelper(
inputNumberToProcessorChannelMap,
inputChannels,
channelReaders,
joinableFactory,
memoryReservedForBroadcastJoin
);
} else {
broadcastJoinHelper = null;
}

return Pair.of(inputChannels, broadcastJoinHelper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.msq.querykit;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand All @@ -32,18 +31,12 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactoryWrapper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BroadcastJoinHelper
Expand Down Expand Up @@ -138,21 +131,9 @@ public IntSet getSideChannelNumbers()
return sideChannelNumbers;
}

public Function<SegmentReference, SegmentReference> makeSegmentMapFn(final Query<?> query)
{
final DataSource dataSourceWithInlinedChannelData = inlineChannelData(query.getDataSource());
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSourceWithInlinedChannelData);

return joinableFactory.createSegmentMapFn(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
new AtomicLong(),
analysis.getBaseQuery().orElse(query)
);
}

@VisibleForTesting
DataSource inlineChannelData(final DataSource originalDataSource)

public DataSource inlineChannelData(final DataSource originalDataSource)
{
if (originalDataSource instanceof InputNumberDataSource) {
final int inputNumber = ((InputNumberDataSource) originalDataSource).getInputNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ private static DataSourcePlan forJoin(
clause.getPrefix(),
clause.getCondition(),
clause.getJoinType(),

// First JoinDataSource (i == 0) involves the base table, so we need to propagate the base table filter.
i == 0 ? analysis.getJoinBaseTableFilter().orElse(null) : null
i == 0 ? analysis.getJoinBaseTableFilter().orElse(null) : null,
dataSource.getJoinableFactoryWrapper()
);
inputSpecs.addAll(clausePlan.getInputSpecs());
clausePlan.getBroadcastInputs().intStream().forEach(n -> broadcastInputs.add(n + shift));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

@JsonTypeName("inputNumber")
public class InputNumberDataSource implements DataSource
Expand Down Expand Up @@ -81,6 +85,27 @@ public boolean isConcrete()
return false;
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAcc
)
{
return Function.identity();
}

@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return newSource;
}

@Override
public byte[] getCacheKey()
{
return null;
}

@JsonProperty
public int getInputNumber()
{
Expand Down
Loading