Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.benchmark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
Expand All @@ -31,6 +32,7 @@
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
Expand Down Expand Up @@ -67,6 +69,8 @@
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
Expand Down Expand Up @@ -190,19 +194,34 @@ public void setup() throws IOException
final ExprMacroTable exprMacroTable = new ExprMacroTable(
ImmutableList.of(
new LookupExprMacro(
lookupName -> {
if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryCodeToNameMap, false)
);
} else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryNumberToNameMap, false)
);
} else {
return null;
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return ImmutableSet.of(LOOKUP_COUNTRY_CODE_TO_NAME, LOOKUP_COUNTRY_NUMBER_TO_NAME);
}

@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryCodeToNameMap, false)
)
);
} else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryNumberToNameMap, false)
)
);
} else {
return Optional.empty();
}
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
LookupEnabledTestExprMacroTable.createTestLookupProvider(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
LookupEnabledTestExprMacroTable.createTestLookupProvider(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,66 @@
}
]
},
{
"description": "topN, 1 agg, join to lookup",
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": "wikipedia_editstream",
"right": {
"type": "lookup",
"lookup": "wiki-simple"
},
"rightPrefix": "j.",
"condition": "page == \"j.k\"",
"joinType": "LEFT"
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupPage",
"expression": "nvl(\"j.v\", \"page\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimension": "lookupPage",
"metric": "rows",
"threshold": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"timestamp": "2013-01-01T00:00:00.000Z",
"result": [
{
"lookupPage": "lookup!",
"rows": 991
},
{
"lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows": 990
},
{
"lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
"rows": 800
}
]
}
]
},
{
"description": "topN, 1 agg, join to inline",
"query": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DimFilterUtils;
Expand Down Expand Up @@ -136,13 +137,17 @@ public String getName()
@Override
public ExtractionFn getExtractionFn()
{
final LookupExtractor lookupExtractor = Strings.isNullOrEmpty(name)
? this.lookup
: Preconditions.checkNotNull(
lookupExtractorFactoryContainerProvider.get(name),
"Lookup [%s] not found",
name
).getLookupExtractorFactory().get();
final LookupExtractor lookupExtractor;

if (Strings.isNullOrEmpty(name)) {
lookupExtractor = this.lookup;
} else {
lookupExtractor = lookupExtractorFactoryContainerProvider
.get(name)
.orElseThrow(() -> new ISE("Lookup [%s] not found", name))
.getLookupExtractorFactory()
.get();
}

return new LookupExtractionFn(
lookupExtractor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,25 @@

package org.apache.druid.query.lookup;

import javax.annotation.Nullable;
import java.util.Optional;
import java.util.Set;

/**
* Provides {@link LookupExtractorFactoryContainer} to query and indexing time dimension transformations.
*
* The most important production implementation is LookupReferencesManager.
*/
@FunctionalInterface
public interface LookupExtractorFactoryContainerProvider
{
@Nullable
LookupExtractorFactoryContainer get(String lookupName);
/**
* Returns the set of all lookup names that {@link #get} can return containers for. Note that because the underlying
* set of valid lookups might change over time, it is not guaranteed that calling {@link #get} on the results will
* actually yield a container (it might have been removed).
*/
Set<String> getAllLookupNames();

/**
* Returns a lookup container for the provided lookupName, if it exists.
*/
Optional<LookupExtractorFactoryContainer> get(String lookupName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;

Expand Down Expand Up @@ -145,11 +146,11 @@ private LookupExtractionFn ensureDelegate()
// http://www.javamex.com/tutorials/double_checked_locking.shtml
synchronized (delegateLock) {
if (null == delegate) {
final LookupExtractor factory = Preconditions.checkNotNull(
manager.get(getLookup()),
"Lookup [%s] not found",
getLookup()
).getLookupExtractorFactory().get();
final LookupExtractor factory =
manager.get(getLookup())
.orElseThrow(() -> new ISE("Lookup [%s] not found", getLookup()))
.getLookupExtractorFactory()
.get();

delegate = new LookupExtractionFn(
factory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;

import java.util.Map;
Expand All @@ -41,7 +43,10 @@ public class JoinableFactoryModule implements Module
* Default mappings of datasources to factories.
*/
private static final Map<Class<? extends DataSource>, Class<? extends JoinableFactory>> FACTORY_MAPPINGS =
ImmutableMap.of(InlineDataSource.class, InlineJoinableFactory.class);
ImmutableMap.of(
InlineDataSource.class, InlineJoinableFactory.class,
LookupDataSource.class, LookupJoinableFactory.class
);

@Override
public void configure(Binder binder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.util.Optional;

@Path("/druid/v1/lookups/introspect")
@ResourceFilters(ConfigResourceFilter.class)
Expand All @@ -48,12 +49,15 @@ public LookupIntrospectionResource(
@Path("/{lookupId}")
public Object introspectLookup(@PathParam("lookupId") final String lookupId)
{
final LookupExtractorFactoryContainer container = lookupExtractorFactoryContainerProvider.get(lookupId);
final Optional<LookupExtractorFactoryContainer> maybeContainer =
lookupExtractorFactoryContainerProvider.get(lookupId);

if (container == null) {
if (!maybeContainer.isPresent()) {
LOGGER.error("trying to introspect non existing lookup [%s]", lookupId);
return Response.status(Response.Status.NOT_FOUND).build();
}

final LookupExtractorFactoryContainer container = maybeContainer.get();
LookupIntrospectHandler introspectHandler = container.getLookupExtractorFactory().getIntrospectHandler();
if (introspectHandler != null) {
return introspectHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -295,11 +296,16 @@ private void addNotice(Notice notice)
}

@Override
@Nullable
public LookupExtractorFactoryContainer get(String lookupName)
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return stateRef.get().lookupMap.get(lookupName);
return Optional.ofNullable(stateRef.get().lookupMap.get(lookupName));
}

@Override
public Set<String> getAllLookupNames()
{
return stateRef.get().lookupMap.keySet();
}

// Note that this should ensure that "toLoad" and "toDrop" are disjoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.druid.query.dimension.LookupDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow
Expand Down Expand Up @@ -66,11 +68,16 @@ public void configure(Binder binder)
*/
private static class NoopLookupExtractorFactoryContainerProvider implements LookupExtractorFactoryContainerProvider
{
@Nullable
@Override
public LookupExtractorFactoryContainer get(String lookupName)
public Set<String> getAllLookupNames()
{
return null;
return Collections.emptySet();
}

@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@

/**
* A {@link JoinableFactory} for {@link InlineDataSource}. It works by building an {@link IndexedTable}.
*
* It is not valid to pass any other DataSource type to the "build" method.
*/
public class InlineJoinableFactory implements JoinableFactory
{
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
if (condition.canHashJoin() && dataSource instanceof InlineDataSource) {
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;

if (condition.canHashJoin()) {
final Set<String> rightKeyColumns = condition.getRightEquiConditionKeys();

return Optional.of(
Expand Down
Loading