diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md
index 23ef47f6549e..96ffd7f48cdf 100644
--- a/docs/content/querying/lookups.md
+++ b/docs/content/querying/lookups.md
@@ -323,6 +323,8 @@ It is possible to save the configuration across restarts such that a node will n
|Property|Description|Default|
|--------|-----------|-------|
|`druid.lookup.snapshotWorkingDir`| Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null|
+|`druid.lookup.numLookupLoadingThreads`| Number of threads for loading the lookups in parallel on startup. This thread pool is destroyed once startup is done. It is not kept during the lifetime of the JVM|Available Processors / 2|
+|`druid.lookup.enableLookupSyncOnStartup`|Enable the lookup synchronization process with coordinator on startup. The queryable nodes will fetch and load the lookups from the coordinator instead of waiting for the coordinator to load the lookups for them. Users may opt to disable this option if there are no lookups configured in the cluster.|true|
## Introspect a Lookup
diff --git a/extensions-core/histogram/pom.xml b/extensions-core/histogram/pom.xml
index 8eb5ae08db29..d6df9405bd5b 100644
--- a/extensions-core/histogram/pom.xml
+++ b/extensions-core/histogram/pom.xml
@@ -59,6 +59,13 @@
test-jar
test
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ test
+ test-jar
+
junit
junit
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
index 013e91b479f9..e34ea20fbc28 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
@@ -29,7 +29,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
-import io.druid.query.expression.TestExprMacroTable;
+import io.druid.query.expression.TestExpressionMacroTable;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
@@ -74,7 +74,7 @@ public int columnCacheSizeBytes()
jsonMapper.setInjectableValues(
new InjectableValues.Std()
- .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
+ .addValue(ExprMacroTable.class.getName(), TestExpressionMacroTable.INSTANCE)
.addValue(IndexIO.class, indexIO)
.addValue(ObjectMapper.class, jsonMapper)
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
diff --git a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java
index 7a98ffd5edb8..33f4de0fdbf5 100644
--- a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java
+++ b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java
@@ -35,8 +35,7 @@
@JsonSubTypes.Type(name = "default", value = DefaultDimensionSpec.class),
@JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class),
@JsonSubTypes.Type(name = "regexFiltered", value = RegexFilteredDimensionSpec.class),
- @JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class),
- @JsonSubTypes.Type(name = "lookup", value = LookupDimensionSpec.class)
+ @JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class)
})
public interface DimensionSpec extends Cacheable
{
diff --git a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java
index df6c86c4867f..5e6a3b95c0ac 100644
--- a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java
+++ b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java
@@ -24,7 +24,6 @@
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.Cacheable;
import io.druid.query.lookup.LookupExtractionFn;
-import io.druid.query.lookup.RegisteredLookupExtractionFn;
import javax.annotation.Nullable;
@@ -41,7 +40,6 @@
@JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class),
@JsonSubTypes.Type(name = "identity", value = IdentityExtractionFn.class),
@JsonSubTypes.Type(name = "lookup", value = LookupExtractionFn.class),
- @JsonSubTypes.Type(name = "registeredLookup", value = RegisteredLookupExtractionFn.class),
@JsonSubTypes.Type(name = "substring", value = SubstringDimExtractionFn.class),
@JsonSubTypes.Type(name = "cascade", value = CascadeExtractionFn.class),
@JsonSubTypes.Type(name = "stringFormat", value = StringFormatExtractionFn.class),
diff --git a/processing/src/main/java/io/druid/query/lookup/LookupConfig.java b/processing/src/main/java/io/druid/query/lookup/LookupConfig.java
index fea346b745fd..7f424c1e987c 100644
--- a/processing/src/main/java/io/druid/query/lookup/LookupConfig.java
+++ b/processing/src/main/java/io/druid/query/lookup/LookupConfig.java
@@ -26,11 +26,19 @@
public class LookupConfig
{
- @JsonProperty
- private final String snapshotWorkingDir;
+ @JsonProperty("snapshotWorkingDir")
+ private String snapshotWorkingDir;
+
+ @JsonProperty("enableLookupSyncOnStartup")
+ private boolean enableLookupSyncOnStartup = true;
+
+ @JsonProperty("numLookupLoadingThreads")
+ private int numLookupLoadingThreads = Runtime.getRuntime().availableProcessors() / 2;
/**
- * @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility
+ * @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility
+ * @param numLookupLoadingThreads number of threads for loading the lookups as part of the synchronization process
+ * @param enableLookupSyncOnStartup decides whether the lookup synchronization process should be enabled at startup
*/
@JsonCreator
public LookupConfig(
@@ -45,6 +53,15 @@ public String getSnapshotWorkingDir()
return snapshotWorkingDir;
}
+ public int getNumLookupLoadingThreads()
+ {
+ return numLookupLoadingThreads;
+ }
+
+ public boolean getEnableLookupSyncOnStartup()
+ {
+ return enableLookupSyncOnStartup;
+ }
@Override
public boolean equals(Object o)
@@ -58,7 +75,9 @@ public boolean equals(Object o)
LookupConfig that = (LookupConfig) o;
- return getSnapshotWorkingDir().equals(that.getSnapshotWorkingDir());
+ return snapshotWorkingDir.equals(that.snapshotWorkingDir) &&
+ enableLookupSyncOnStartup == that.enableLookupSyncOnStartup &&
+ numLookupLoadingThreads == that.numLookupLoadingThreads;
}
@@ -67,6 +86,8 @@ public String toString()
{
return "LookupConfig{" +
"snapshotWorkingDir='" + getSnapshotWorkingDir() + '\'' +
+ " numLookupLoadingThreads='" + getNumLookupLoadingThreads() + '\'' +
+ " enableLookupSyncOnStartup='" + getEnableLookupSyncOnStartup() + '\'' +
'}';
}
}
diff --git a/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java b/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java
index b1319cbe0146..8527b65b839b 100644
--- a/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java
+++ b/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java
@@ -20,17 +20,7 @@
package io.druid.query.expression;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import io.druid.math.expr.ExprMacroTable;
-import io.druid.query.extraction.MapLookupExtractor;
-import io.druid.query.lookup.LookupExtractor;
-import io.druid.query.lookup.LookupExtractorFactory;
-import io.druid.query.lookup.LookupExtractorFactoryContainer;
-import io.druid.query.lookup.LookupIntrospectHandler;
-import io.druid.query.lookup.LookupReferencesManager;
-import org.easymock.EasyMock;
-
-import javax.annotation.Nullable;
public class TestExprMacroTable extends ExprMacroTable
{
@@ -41,7 +31,6 @@ private TestExprMacroTable()
super(
ImmutableList.of(
new LikeExprMacro(),
- new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo"))),
new RegexpExtractExprMacro(),
new TimestampCeilExprMacro(),
new TimestampExtractExprMacro(),
@@ -55,52 +44,4 @@ private TestExprMacroTable()
)
);
}
-
- /**
- * Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo".
- */
- public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap theLookup)
- {
- final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
- EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn(
- new LookupExtractorFactoryContainer(
- "v0",
- new LookupExtractorFactory()
- {
- @Override
- public boolean start()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean close()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean replaces(@Nullable final LookupExtractorFactory other)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LookupIntrospectHandler getIntrospectHandler()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LookupExtractor get()
- {
- return new MapLookupExtractor(theLookup, false);
- }
- }
- )
- ).anyTimes();
- EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
- EasyMock.replay(lookupReferencesManager);
- return lookupReferencesManager;
- }
}
diff --git a/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java b/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java
index 836176e59135..c2e39bf0a0b3 100644
--- a/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java
+++ b/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java
@@ -32,13 +32,36 @@ public class LookupConfigTest
{
ObjectMapper mapper = TestHelper.getJsonMapper();
-
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Test
public void TestSerDesr() throws IOException
{
LookupConfig lookupConfig = new LookupConfig(temporaryFolder.newFile().getAbsolutePath());
- Assert.assertEquals(lookupConfig, mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig)));
+ Assert.assertEquals(
+ lookupConfig,
+ mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig))
+ );
+ }
+
+ @Test
+ public void testSerdeWithNonDefaults() throws Exception
+ {
+ String json = "{\n"
+ + " \"enableLookupSyncOnStartup\": false,\n"
+ + " \"snapshotWorkingDir\": \"/tmp\",\n"
+ + " \"numLookupLoadingThreads\": 4 \n"
+ + "}\n";
+ LookupConfig config = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(json, LookupConfig.class)
+ ),
+ LookupConfig.class
+ );
+
+ Assert.assertEquals("/tmp", config.getSnapshotWorkingDir());
+ Assert.assertEquals(false, config.getEnableLookupSyncOnStartup());
+ Assert.assertEquals(4, config.getNumLookupLoadingThreads());
}
}
diff --git a/processing/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java b/server/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java
similarity index 100%
rename from processing/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java
rename to server/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java
diff --git a/processing/src/main/java/io/druid/query/expression/LookupExprMacro.java b/server/src/main/java/io/druid/query/expression/LookupExprMacro.java
similarity index 100%
rename from processing/src/main/java/io/druid/query/expression/LookupExprMacro.java
rename to server/src/main/java/io/druid/query/expression/LookupExprMacro.java
diff --git a/server/src/main/java/io/druid/query/lookup/LookupModule.java b/server/src/main/java/io/druid/query/lookup/LookupModule.java
index c8fdeaf68c0d..59c0d7f96c3d 100644
--- a/server/src/main/java/io/druid/query/lookup/LookupModule.java
+++ b/server/src/main/java/io/druid/query/lookup/LookupModule.java
@@ -25,6 +25,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -48,6 +49,7 @@
import io.druid.guice.annotations.Smile;
import io.druid.initialization.DruidModule;
import io.druid.java.util.common.logger.Logger;
+import io.druid.query.dimension.LookupDimensionSpec;
import io.druid.query.expression.LookupExprMacro;
import io.druid.server.DruidNode;
import io.druid.server.http.HostAndPortWithScheme;
@@ -84,7 +86,11 @@ public static String getTierListenerPath(String tier)
public List extends Module> getJacksonModules()
{
return ImmutableList.of(
- new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class)
+ new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class),
+ new SimpleModule().registerSubtypes(
+ new NamedType(LookupDimensionSpec.class, "lookup"),
+ new NamedType(RegisteredLookupExtractionFn.class, "registeredLookup")
+ )
);
}
diff --git a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java
similarity index 59%
rename from processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java
rename to server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java
index 1485fe725cae..961447ab356f 100644
--- a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java
+++ b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java
@@ -20,6 +20,7 @@
package io.druid.query.lookup;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -28,21 +29,34 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
+import com.metamx.http.client.response.FullResponseHolder;
+import io.druid.client.coordinator.Coordinator;
import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
+import io.druid.discovery.DruidLeaderClient;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
@@ -79,21 +93,44 @@ public class LookupReferencesManager
//for unit testing only
private final boolean testMode;
+ private final DruidLeaderClient druidLeaderClient;
+
+ private final ObjectMapper jsonMapper;
+
+ private static final TypeReference
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ test-jar
+ test
+
diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
index e9d12f65ccad..27f4b69bc71f 100644
--- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java
@@ -57,7 +57,7 @@
import io.druid.query.aggregation.FloatSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.expression.LookupExprMacro;
-import io.druid.query.expression.TestExprMacroTable;
+import io.druid.query.expression.TestExpressionMacroTable;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
@@ -140,7 +140,7 @@ public void configure(final Binder binder)
binder.bind(LookupReferencesManager.class)
.toInstance(
- TestExprMacroTable.createTestLookupReferencesManager(
+ TestExpressionMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"