From 489fdc67f40a99fb12087bb4492cf3d9adde0660 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 11 Jun 2020 06:18:57 -0700 Subject: [PATCH 1/7] global table datasource for broadcast segments --- .../org/apache/druid/query/DataSource.java | 3 +- .../druid/query/GlobalTableDataSource.java | 54 ++++ .../druid/server/LocalQuerySegmentWalker.java | 1 + .../apache/druid/server/SegmentManager.java | 6 + .../druid/sql/calcite/schema/DruidSchema.java | 233 ++++++++++-------- .../schema/DruidSchemaNoDataInitTest.java | 4 + .../sql/calcite/schema/DruidSchemaTest.java | 4 + .../sql/calcite/schema/SystemSchemaTest.java | 3 + .../druid/sql/calcite/util/CalciteTests.java | 4 + 9 files changed, 204 insertions(+), 108 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 549b06b0b3eb..47fb9ecdef81 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -35,7 +35,8 @@ @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), - @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline") + @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"), + @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "global") }) public interface DataSource { diff --git a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java new file mode 100644 index 000000000000..44a93699aad6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("global") +public class GlobalTableDataSource extends TableDataSource +{ + @JsonCreator + public GlobalTableDataSource(@JsonProperty("name") String name) + { + super(name); + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public String toString() + { + return "GlobalTableDataSource{" + + "name='" + getName() + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 5e3928c799e0..8283524040f4 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -129,6 +129,7 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final .applyPostMergeDecoration() .emitCPUTimeMetric(emitter, cpuAccumulator); } + @Override public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 45b6538bb610..a5484f21c9a7 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -121,6 +122,11 @@ public Map getDataSourceSizes() return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize); } + public Set getDataSourceNames() + { + return dataSources.keySet(); + } + /** * Returns a map of dataSource to the number of segments managed by this segmentManager. This method should be * carefully because the returned map might be different from the actual data source states. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 35037e2ff2cb..c0c3b34876f5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -41,12 +41,14 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; @@ -56,6 +58,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthenticationResult; @@ -81,6 +84,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -100,8 +104,10 @@ public class DruidSchema extends AbstractSchema private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; + private final SegmentManager segmentManager; private final ViewManager viewManager; private final ExecutorService cacheExec; + private final ScheduledExecutorService localSegmentExec; private final ConcurrentMap tables; // For awaitInitialization. @@ -122,6 +128,8 @@ public class DruidSchema extends AbstractSchema // All dataSources that need tables regenerated. private final Set dataSourcesNeedingRebuild = new HashSet<>(); + private final Set broadcastDatasources = new HashSet<>(); + // All segments that need to be refreshed. private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); @@ -137,6 +145,7 @@ public class DruidSchema extends AbstractSchema public DruidSchema( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, + final SegmentManager segmentManager, final PlannerConfig config, final ViewManager viewManager, final Escalator escalator @@ -144,9 +153,11 @@ public DruidSchema( { this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); Preconditions.checkNotNull(serverView, "serverView"); + this.segmentManager = segmentManager; this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); + this.localSegmentExec = Execs.scheduledSingleThreaded("DruidSchema-SegmentCache-%d"); this.tables = new ConcurrentHashMap<>(); this.escalator = escalator; @@ -196,119 +207,130 @@ public ServerView.CallbackAction serverSegmentRemoved( public void start() throws InterruptedException { cacheExec.submit( - new Runnable() - { - @Override - public void run() - { - try { - while (!Thread.currentThread().isInterrupted()) { - final Set segmentsToRefresh = new TreeSet<>(); - final Set dataSourcesToRebuild = new TreeSet<>(); - - try { - synchronized (lock) { - final long nextRefreshNoFuzz = DateTimes - .utc(lastRefresh) - .plus(config.getMetadataRefreshPeriod()) - .getMillis(); - - // Fuzz a bit to spread load out when we have multiple brokers. - final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); - - while (true) { - // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). - final boolean wasRecentFailure = DateTimes.utc(lastFailure) - .plus(config.getMetadataRefreshPeriod()) - .isAfterNow(); - - if (isServerViewInitialized && - !wasRecentFailure && - (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && - (refreshImmediately || nextRefresh < System.currentTimeMillis())) { - // We need to do a refresh. Break out of the waiting loop. - break; - } - - if (isServerViewInitialized) { - // Server view is initialized, but we don't need to do a refresh. Could happen if there are - // no segments in the system yet. Just mark us as initialized, then. - initialized.countDown(); - } - - // Wait some more, we'll wake up when it might be time to do another refresh. - lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); + () -> { + try { + while (!Thread.currentThread().isInterrupted()) { + final Set segmentsToRefresh = new TreeSet<>(); + final Set dataSourcesToRebuild = new TreeSet<>(); + + try { + synchronized (lock) { + final long nextRefreshNoFuzz = DateTimes + .utc(lastRefresh) + .plus(config.getMetadataRefreshPeriod()) + .getMillis(); + + // Fuzz a bit to spread load out when we have multiple brokers. + final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); + + while (true) { + // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). + final boolean wasRecentFailure = DateTimes.utc(lastFailure) + .plus(config.getMetadataRefreshPeriod()) + .isAfterNow(); + + if (isServerViewInitialized && + !wasRecentFailure && + (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && + (refreshImmediately || nextRefresh < System.currentTimeMillis())) { + // We need to do a refresh. Break out of the waiting loop. + break; } - segmentsToRefresh.addAll(segmentsNeedingRefresh); - segmentsNeedingRefresh.clear(); - - // Mutable segments need a refresh every period, since new columns could be added dynamically. - segmentsNeedingRefresh.addAll(mutableSegments); + if (isServerViewInitialized) { + // Server view is initialized, but we don't need to do a refresh. Could happen if there are + // no segments in the system yet. Just mark us as initialized, then. + initialized.countDown(); + } - lastFailure = 0L; - lastRefresh = System.currentTimeMillis(); - refreshImmediately = false; + // Wait some more, we'll wake up when it might be time to do another refresh. + lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); } - // Refresh the segments. - final Set refreshed = refreshSegments(segmentsToRefresh); + segmentsToRefresh.addAll(segmentsNeedingRefresh); + segmentsNeedingRefresh.clear(); - synchronized (lock) { - // Add missing segments back to the refresh list. - segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); + // Mutable segments need a refresh every period, since new columns could be added dynamically. + segmentsNeedingRefresh.addAll(mutableSegments); - // Compute the list of dataSources to rebuild tables for. - dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); - refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); - dataSourcesNeedingRebuild.clear(); + lastFailure = 0L; + lastRefresh = System.currentTimeMillis(); + refreshImmediately = false; + } - lock.notifyAll(); - } + // Refresh the segments. + final Set refreshed = refreshSegments(segmentsToRefresh); - // Rebuild the dataSources. - for (String dataSource : dataSourcesToRebuild) { - final DruidTable druidTable = buildDruidTable(dataSource); - final DruidTable oldTable = tables.put(dataSource, druidTable); - if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); - } else { - log.debug("dataSource [%s] signature is unchanged.", dataSource); - } - } + synchronized (lock) { + // Add missing segments back to the refresh list. + segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); - initialized.countDown(); - } - catch (InterruptedException e) { - // Fall through. - throw e; + // Compute the list of dataSources to rebuild tables for. + dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); + refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); + dataSourcesNeedingRebuild.clear(); + + lock.notifyAll(); } - catch (Exception e) { - log.warn(e, "Metadata refresh failed, trying again soon."); - - synchronized (lock) { - // Add our segments and dataSources back to their refresh and rebuild lists. - segmentsNeedingRefresh.addAll(segmentsToRefresh); - dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); - lastFailure = System.currentTimeMillis(); - lock.notifyAll(); + + // Rebuild the dataSources. + for (String dataSource : dataSourcesToRebuild) { + final DruidTable druidTable = buildDruidTable(dataSource); + final DruidTable oldTable = tables.put(dataSource, druidTable); + if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { + log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); + } else { + log.debug("dataSource [%s] signature is unchanged.", dataSource); } } + + initialized.countDown(); + } + catch (InterruptedException e) { + // Fall through. + throw e; + } + catch (Exception e) { + log.warn(e, "Metadata refresh failed, trying again soon."); + + synchronized (lock) { + // Add our segments and dataSources back to their refresh and rebuild lists. + segmentsNeedingRefresh.addAll(segmentsToRefresh); + dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); + lastFailure = System.currentTimeMillis(); + lock.notifyAll(); + } } - } - catch (InterruptedException e) { - // Just exit. - } - catch (Throwable e) { - // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like - // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. - log.makeAlert(e, "Metadata refresh failed permanently").emit(); - throw e; - } - finally { - log.info("Metadata refresh stopped."); } } + catch (InterruptedException e) { + // Just exit. + } + catch (Throwable e) { + // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like + // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. + log.makeAlert(e, "Metadata refresh failed permanently").emit(); + throw e; + } + finally { + log.info("Metadata refresh stopped."); + } + } + ); + + ScheduledExecutors.scheduleWithFixedDelay( + localSegmentExec, + config.getMetadataRefreshPeriod().toStandardDuration(), + config.getMetadataRefreshPeriod().toStandardDuration(), + () -> { + synchronized (lock) { + // refresh known broadcast segments + Set localSegmentDatasources = segmentManager.getDataSourceNames(); + dataSourcesNeedingRebuild.addAll(localSegmentDatasources); + broadcastDatasources.clear(); + broadcastDatasources.addAll(localSegmentDatasources); + lock.notifyAll(); + } } ); @@ -324,6 +346,7 @@ public void run() public void stop() { cacheExec.shutdownNow(); + localSegmentExec.shutdownNow(); } public void awaitInitialization() throws InterruptedException @@ -350,12 +373,6 @@ protected Multimap getFunctionMultim @VisibleForTesting void addSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree - // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata - // loop... - return; - } synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; @@ -434,10 +451,6 @@ void removeSegment(final DataSegment segment) @VisibleForTesting void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // cheese it - return; - } synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); @@ -616,6 +629,12 @@ private DruidTable buildDruidTable(final String dataSource) final RowSignature.Builder builder = RowSignature.builder(); columnTypes.forEach(builder::add); + if (broadcastDatasources.contains(dataSource)) { + return new DruidTable( + new GlobalTableDataSource(dataSource), + builder.build() + ); + } return new DruidTable(new TableDataSource(dataSource), builder.build()); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 713dfffedf69..fd20fb594e85 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.util.CalciteTestBase; @@ -30,6 +32,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -50,6 +53,7 @@ public void testInitializationWithNoData() throws Exception conglomerate ), new TestServerInventoryView(Collections.emptyList()), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index ee3bca1a4f8e..ecc7c6395ac2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -41,8 +41,10 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.NoopEscalator; @@ -59,6 +61,7 @@ import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -194,6 +197,7 @@ public void setUp() throws Exception schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 80bf83030f35..3e1356af7b98 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -67,9 +67,11 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; @@ -239,6 +241,7 @@ public Authorizer getAuthorizer(String name) druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 588d5adb8e37..0ca415b8746a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -73,11 +73,13 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; @@ -107,6 +109,7 @@ import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.chrono.ISOChronology; @@ -866,6 +869,7 @@ private static DruidSchema createMockSchema( final DruidSchema schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), plannerConfig, viewManager, TEST_AUTHENTICATOR_ESCALATOR From eeb174a22d6a2d6823c45c997ef88adb625d6e3e Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 12 Jun 2020 04:51:25 -0700 Subject: [PATCH 2/7] tests --- .../query/GlobalTableDataSourceTest.java | 59 ++++++++++++ .../druid/server/SegmentManagerTest.java | 26 ++--- .../druid/sql/calcite/schema/DruidSchema.java | 2 +- .../schema/DruidCalciteSchemaModuleTest.java | 4 + .../sql/calcite/schema/DruidSchemaTest.java | 94 ++++++++++++------- 5 files changed, 141 insertions(+), 44 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java diff --git a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java new file mode 100644 index 000000000000..ea500109c0c7 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.druid.query; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +public class GlobalTableDataSourceTest +{ + private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new GlobalTableDataSource("foo"); + public void testEquals() + { + EqualsVerifier.forClass(GlobalTableDataSource.class).usingGetClass().verify(); + } + + @Test + public void testIsGlobal() + { + Assert.assertTrue(GLOBAL_TABLE_DATA_SOURCE.isGlobal()); + } + + @Test + public void testIsCacheable() + { + Assert.assertFalse(GLOBAL_TABLE_DATA_SOURCE.isCacheable()); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final GlobalTableDataSource deserialized = (GlobalTableDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(GLOBAL_TABLE_DATA_SOURCE), + DataSource.class + ); + + Assert.assertEquals(GLOBAL_TABLE_DATA_SOURCE, deserialized); + } +} diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 562fc1dfffbf..a91411b8db73 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -421,17 +422,19 @@ public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingExcep @SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed. private void assertResult(List expectedExistingSegments) throws SegmentLoadingException { - final Map expectedDataSourceSizes = expectedExistingSegments - .stream() - .collect(Collectors.toMap(DataSegment::getDataSource, DataSegment::getSize, Long::sum)); - final Map expectedDataSourceCounts = expectedExistingSegments - .stream() - .collect(Collectors.toMap(DataSegment::getDataSource, segment -> 1L, Long::sum)); - final Map> expectedDataSources - = new HashMap<>(); + final Map expectedDataSourceSizes = + expectedExistingSegments.stream() + .collect(Collectors.toMap(DataSegment::getDataSource, DataSegment::getSize, Long::sum)); + final Map expectedDataSourceCounts = + expectedExistingSegments.stream() + .collect(Collectors.toMap(DataSegment::getDataSource, segment -> 1L, Long::sum)); + final Set expectedDataSourceNames = expectedExistingSegments.stream() + .map(DataSegment::getDataSource) + .collect(Collectors.toSet()); + final Map> expectedTimelines = new HashMap<>(); for (DataSegment segment : expectedExistingSegments) { final VersionedIntervalTimeline expectedTimeline = - expectedDataSources.computeIfAbsent( + expectedTimelines.computeIfAbsent( segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural()) ); @@ -444,11 +447,12 @@ private void assertResult(List expectedExistingSegments) throws Seg ); } + Assert.assertEquals(expectedDataSourceNames, segmentManager.getDataSourceNames()); Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts()); Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes()); final Map dataSources = segmentManager.getDataSources(); - Assert.assertEquals(expectedDataSources.size(), dataSources.size()); + Assert.assertEquals(expectedTimelines.size(), dataSources.size()); dataSources.forEach( (sourceName, dataSourceState) -> { @@ -458,7 +462,7 @@ private void assertResult(List expectedExistingSegments) throws Seg dataSourceState.getTotalSegmentSize() ); Assert.assertEquals( - expectedDataSources.get(sourceName).getAllTimelineEntries(), + expectedTimelines.get(sourceName).getAllTimelineEntries(), dataSourceState.getTimeline().getAllTimelineEntries() ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index c0c3b34876f5..6d4db1f7fc97 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -605,7 +605,7 @@ void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegme } } - private DruidTable buildDruidTable(final String dataSource) + protected DruidTable buildDruidTable(final String dataSource) { synchronized (lock) { final Map segmentMap = segmentMetadataInfo.get(dataSource); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index edba60a35c2e..11c2b97f4244 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -39,6 +39,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.LookupReferencesManager; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -84,6 +85,8 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase private ObjectMapper objectMapper; @Mock private LookupReferencesManager lookupReferencesManager; + @Mock + private SegmentManager segmentManager; private DruidCalciteSchemaModule target; private Injector injector; @@ -104,6 +107,7 @@ public void setUp() binder.bind(Escalator.class).toInstance(escalator); binder.bind(AuthorizerMapper.class).toInstance(authorizerMapper); binder.bind(InventoryView.class).toInstance(serverInventoryView); + binder.bind(SegmentManager.class).toInstance(segmentManager); binder.bind(DruidLeaderClient.class) .annotatedWith(Coordinator.class) .toInstance(coordinatorDruidLeaderClient); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index ecc7c6395ac2..5bf2f981632d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -33,7 +34,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -59,9 +62,9 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -75,11 +78,21 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class DruidSchemaTest extends CalciteTestBase { - private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig(); + private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig() + { + @Override + public Period getMetadataRefreshPeriod() + { + return new Period("PT1S"); + } + }; private static final List ROWS1 = ImmutableList.of( CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")), @@ -97,6 +110,8 @@ public class DruidSchemaTest extends CalciteTestBase private static Closer resourceCloser; private List druidServers; + private CountDownLatch getDatasourcesLatch = new CountDownLatch(1); + private CountDownLatch buildTableLatch = new CountDownLatch(1); @BeforeClass public static void setUpClass() @@ -116,10 +131,13 @@ public static void tearDownClass() throws IOException private SpecificSegmentsQuerySegmentWalker walker = null; private DruidSchema schema = null; + private SegmentManager segmentManager; + private Set dataSourceNames; @Before public void setUp() throws Exception { + dataSourceNames = Sets.newConcurrentHashSet(); final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() .tmpDir(new File(tmpDir, "1")) @@ -149,6 +167,16 @@ public void setUp() throws Exception .rows(ROWS2) .buildMMappedIndex(); + segmentManager = new SegmentManager(EasyMock.createMock(SegmentLoader.class)) + { + @Override + public Set getDataSourceNames() + { + getDatasourcesLatch.countDown(); + return dataSourceNames; + } + }; + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(CalciteTests.DATASOURCE1) @@ -197,11 +225,20 @@ public void setUp() throws Exception schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, - new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + segmentManager, PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() - ); + ) + { + @Override + protected DruidTable buildDruidTable(String dataSource) + { + DruidTable table = super.buildDruidTable(dataSource); + buildTableLatch.countDown(); + return table; + } + }; schema.start(); schema.awaitInitialization(); @@ -424,34 +461,27 @@ public void testAvailableSegmentMetadataIsRealtime() } @Test - public void testAvailableSegmentFromBrokerIsIgnored() + public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException { - - Assert.assertEquals(4, schema.getTotalSegments()); - - DruidServerMetadata metadata = new DruidServerMetadata( - "broker", - "localhost:0", - null, - 1000L, - ServerType.BROKER, - "broken", - 0 - ); - - DataSegment segment = new DataSegment( - "test", - Intervals.of("2011-04-01/2011-04-11"), - "v1", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - NoneShardSpec.instance(), - 1, - 100L - ); - schema.addSegment(metadata, segment); - Assert.assertEquals(4, schema.getTotalSegments()); - + DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + + dataSourceNames.add("foo"); + // wait for build + buildTableLatch.await(1, TimeUnit.SECONDS); + buildTableLatch = new CountDownLatch(1); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); } + } From bda1c7f86a1f74b0868472f0dd9b1b10ec9c9ed1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 12 Jun 2020 06:34:04 -0700 Subject: [PATCH 3/7] fix --- .../main/java/org/apache/druid/query/TableDataSource.java | 2 +- .../org/apache/druid/query/GlobalTableDataSourceTest.java | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index 4c371cf84510..e75a7e8bbb82 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -98,7 +98,7 @@ public final boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof TableDataSource)) { + if (!(o instanceof TableDataSource) || !getClass().equals(o.getClass())) { return false; } diff --git a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java index ea500109c0c7..bd9c4782126c 100644 --- a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.druid.query; import com.fasterxml.jackson.core.JsonProcessingException; @@ -28,9 +29,14 @@ public class GlobalTableDataSourceTest { private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new GlobalTableDataSource("foo"); + + @Test public void testEquals() { - EqualsVerifier.forClass(GlobalTableDataSource.class).usingGetClass().verify(); + EqualsVerifier.forClass(GlobalTableDataSource.class) + .usingGetClass() + .withNonnullFields("name") + .verify(); } @Test From aff3b0ab96f11bb94cbe3a441adb8b1c1ec78b28 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 12 Jun 2020 12:29:22 -0700 Subject: [PATCH 4/7] fix test --- .../main/java/org/apache/druid/query/TableDataSource.java | 7 ++++++- .../test/java/org/apache/druid/query/DataSourceTest.java | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index e75a7e8bbb82..917545d894ea 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -98,7 +98,12 @@ public final boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof TableDataSource) || !getClass().equals(o.getClass())) { + if (!(o instanceof TableDataSource)) { + return false; + } + + if ((o instanceof GlobalTableDataSource || this instanceof GlobalTableDataSource) && + !getClass().equals(o.getClass())) { return false; } diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 090570db7acb..7c7f50f281bb 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -99,5 +99,4 @@ public void testUnionDataSource() throws Exception final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class); Assert.assertEquals(dataSource, serde); } - } From 9d7a9b6fce0b4d8ba4f9e7f4321aa6eb24458f01 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 12 Jun 2020 15:59:27 -0700 Subject: [PATCH 5/7] comments and javadocs --- .../org/apache/druid/query/GlobalTableDataSource.java | 10 ++++++++++ .../apache/druid/sql/calcite/schema/DruidSchema.java | 4 +++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java index 44a93699aad6..7e9048df4158 100644 --- a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java @@ -23,6 +23,16 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +/** + * {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a + * {@link org.apache.druid.segment.join.JoinableFactory} that can create an + * {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows + * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be pushed + * down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using + * {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the + * broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join, + * they will be treated as any normal segment. + */ @JsonTypeName("global") public class GlobalTableDataSource extends TableDataSource { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 6d4db1f7fc97..9fae29a7041e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -324,7 +324,9 @@ public void start() throws InterruptedException config.getMetadataRefreshPeriod().toStandardDuration(), () -> { synchronized (lock) { - // refresh known broadcast segments + // refresh known broadcast segments. Since DruidSchema is only present on the broker, any segment we have + // locally in the SegmentManager must be broadcast datasources. This could potentially be replaced in the + // future by fetching load rules from the coordinator Set localSegmentDatasources = segmentManager.getDataSourceNames(); dataSourcesNeedingRebuild.addAll(localSegmentDatasources); broadcastDatasources.clear(); From 7c5067460dd691ba3e090af67fa6da8fe05e3ddc Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Jun 2020 05:18:59 -0700 Subject: [PATCH 6/7] review stuffs --- .../org/apache/druid/query/DataSource.java | 2 +- .../druid/query/GlobalTableDataSource.java | 14 +- .../query/GlobalTableDataSourceTest.java | 10 +- .../apache/druid/client/BrokerServerView.java | 57 ++++--- .../druid/sql/calcite/schema/DruidSchema.java | 159 +++++++++--------- .../sql/calcite/schema/DruidSchemaTest.java | 37 +++- .../calcite/util/TestServerInventoryView.java | 78 ++++++++- 7 files changed, 227 insertions(+), 130 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 47fb9ecdef81..ff768eda25b0 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -36,7 +36,7 @@ @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"), - @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "global") + @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable") }) public interface DataSource { diff --git a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java index 7e9048df4158..da5f1390ca3d 100644 --- a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java @@ -27,13 +27,13 @@ * {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a * {@link org.apache.druid.segment.join.JoinableFactory} that can create an * {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows - * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be pushed - * down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using + * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be + * pushed down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using * {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the * broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join, - * they will be treated as any normal segment. + * they will be treated as any normal table datasource. */ -@JsonTypeName("global") +@JsonTypeName("globalTable") public class GlobalTableDataSource extends TableDataSource { @JsonCreator @@ -42,12 +42,6 @@ public GlobalTableDataSource(@JsonProperty("name") String name) super(name); } - @Override - public boolean isCacheable() - { - return false; - } - @Override public boolean isGlobal() { diff --git a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java index bd9c4782126c..fea379015ad3 100644 --- a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java @@ -40,15 +40,17 @@ public void testEquals() } @Test - public void testIsGlobal() + public void testGlobalTableIsNotEqualsTable() { - Assert.assertTrue(GLOBAL_TABLE_DATA_SOURCE.isGlobal()); + TableDataSource tbl = new TableDataSource(GLOBAL_TABLE_DATA_SOURCE.getName()); + Assert.assertNotEquals(GLOBAL_TABLE_DATA_SOURCE, tbl); + Assert.assertNotEquals(tbl, GLOBAL_TABLE_DATA_SOURCE); } @Test - public void testIsCacheable() + public void testIsGlobal() { - Assert.assertFalse(GLOBAL_TABLE_DATA_SOURCE.isCacheable()); + Assert.assertTrue(GLOBAL_TABLE_DATA_SOURCE.isGlobal()); } @Test diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 3b5d40872cd6..debd72131c23 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -218,51 +218,54 @@ private QueryableDruidServer removeServer(DruidServer server) private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree - // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query - // loop... - return; - } SegmentId segmentId = segment.getId(); synchronized (lock) { - log.debug("Adding segment[%s] for server[%s]", segment, server); - - ServerSelector selector = selectors.get(segmentId); - if (selector == null) { - selector = new ServerSelector(segment, tierSelectorStrategy); + // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree + // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query + // loop... + if (!server.getType().equals(ServerType.BROKER)) { + log.debug("Adding segment[%s] for server[%s]", segment, server); + ServerSelector selector = selectors.get(segmentId); + if (selector == null) { + selector = new ServerSelector(segment, tierSelectorStrategy); + + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + timelines.put(segment.getDataSource(), timeline); + } - VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); - if (timeline == null) { - timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - timelines.put(segment.getDataSource(), timeline); + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + selectors.put(segmentId, selector); } - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); - selectors.put(segmentId, selector); - } - - QueryableDruidServer queryableDruidServer = clients.get(server.getName()); - if (queryableDruidServer == null) { - queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); + QueryableDruidServer queryableDruidServer = clients.get(server.getName()); + if (queryableDruidServer == null) { + queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); + } + selector.addServerAndUpdateSegment(queryableDruidServer, segment); } - selector.addServerAndUpdateSegment(queryableDruidServer, segment); + // run the callbacks, even if the segment came from a broker, lets downstream watchers decide what to do with it runTimelineCallbacks(callback -> callback.segmentAdded(server, segment)); } } private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // might as well save the trouble of grabbing a lock for something that isn't there.. - return; - } + SegmentId segmentId = segment.getId(); final ServerSelector selector; synchronized (lock) { log.debug("Removing segment[%s] from server[%s].", segmentId, server); + // we don't store broker segments here, but still run the callbacks for the segment being removed from the server + // since the broker segments are not stored on the timeline, do not fire segmentRemoved event + if (server.getType().equals(ServerType.BROKER)) { + runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment)); + return; + } + selector = selectors.get(segmentId); if (selector == null) { log.warn("Told to remove non-existant segment[%s]", segmentId); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 9fae29a7041e..6762aaf13c94 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; @@ -84,7 +83,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -107,7 +105,6 @@ public class DruidSchema extends AbstractSchema private final SegmentManager segmentManager; private final ViewManager viewManager; private final ExecutorService cacheExec; - private final ScheduledExecutorService localSegmentExec; private final ConcurrentMap tables; // For awaitInitialization. @@ -123,22 +120,27 @@ public class DruidSchema extends AbstractSchema private int totalSegments = 0; // All mutable segments. + @GuardedBy("lock") private final Set mutableSegments = new TreeSet<>(SEGMENT_ORDER); // All dataSources that need tables regenerated. + @GuardedBy("lock") private final Set dataSourcesNeedingRebuild = new HashSet<>(); - private final Set broadcastDatasources = new HashSet<>(); - // All segments that need to be refreshed. + @GuardedBy("lock") private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); // Escalator, so we can attach an authentication result to queries we generate. private final Escalator escalator; + @GuardedBy("lock") private boolean refreshImmediately = false; + @GuardedBy("lock") private long lastRefresh = 0L; + @GuardedBy("lock") private long lastFailure = 0L; + @GuardedBy("lock") private boolean isServerViewInitialized = false; @Inject @@ -157,7 +159,6 @@ public DruidSchema( this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); - this.localSegmentExec = Execs.scheduledSingleThreaded("DruidSchema-SegmentCache-%d"); this.tables = new ConcurrentHashMap<>(); this.escalator = escalator; @@ -318,24 +319,6 @@ public void start() throws InterruptedException } ); - ScheduledExecutors.scheduleWithFixedDelay( - localSegmentExec, - config.getMetadataRefreshPeriod().toStandardDuration(), - config.getMetadataRefreshPeriod().toStandardDuration(), - () -> { - synchronized (lock) { - // refresh known broadcast segments. Since DruidSchema is only present on the broker, any segment we have - // locally in the SegmentManager must be broadcast datasources. This could potentially be replaced in the - // future by fetching load rules from the coordinator - Set localSegmentDatasources = segmentManager.getDataSourceNames(); - dataSourcesNeedingRebuild.addAll(localSegmentDatasources); - broadcastDatasources.clear(); - broadcastDatasources.addAll(localSegmentDatasources); - lock.notifyAll(); - } - } - ); - if (config.isAwaitInitializationOnStart()) { final long startNanos = System.nanoTime(); log.debug("%s waiting for initialization.", getClass().getSimpleName()); @@ -348,7 +331,6 @@ public void start() throws InterruptedException public void stop() { cacheExec.shutdownNow(); - localSegmentExec.shutdownNow(); } public void awaitInitialization() throws InterruptedException @@ -376,44 +358,50 @@ protected Multimap getFunctionMultim void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; - if (segmentMetadata == null) { - // segmentReplicatable is used to determine if segments are served by historical or realtime servers - long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; - segmentMetadata = AvailableSegmentMetadata.builder( - segment, - isRealtime, - ImmutableSet.of(server), - null, - DEFAULT_NUM_ROWS - ).build(); - // Unknown segment. - setAvailableSegmentMetadata(segment.getId(), segmentMetadata); - segmentsNeedingRefresh.add(segment.getId()); - if (!server.isSegmentReplicationTarget()) { - log.debug("Added new mutable segment[%s].", segment.getId()); - mutableSegments.add(segment.getId()); - } else { - log.debug("Added new immutable segment[%s].", segment.getId()); - } + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it needs to be globalized + dataSourcesNeedingRebuild.add(segment.getDataSource()); } else { - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServers) - .add(server) - .build(); - final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - knownSegments.put(segment.getId(), metadataWithNumReplicas); - if (server.isSegmentReplicationTarget()) { - // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, - // even if it's also available on non-replicatable (realtime) servers. - mutableSegments.remove(segment.getId()); - log.debug("Segment[%s] has become immutable.", segment.getId()); + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; + if (segmentMetadata == null) { + // segmentReplicatable is used to determine if segments are served by historical or realtime servers + long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; + segmentMetadata = AvailableSegmentMetadata.builder( + segment, + isRealtime, + ImmutableSet.of(server), + null, + DEFAULT_NUM_ROWS + ).build(); + // Unknown segment. + setAvailableSegmentMetadata(segment.getId(), segmentMetadata); + segmentsNeedingRefresh.add(segment.getId()); + if (!server.isSegmentReplicationTarget()) { + log.debug("Added new mutable segment[%s].", segment.getId()); + mutableSegments.add(segment.getId()); + } else { + log.debug("Added new immutable segment[%s].", segment.getId()); + } + } else { + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = new ImmutableSet.Builder() + .addAll(segmentServers) + .add(server) + .build(); + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + knownSegments.put(segment.getId(), metadataWithNumReplicas); + if (server.isSegmentReplicationTarget()) { + // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, + // even if it's also available on non-replicatable (realtime) servers. + mutableSegments.remove(segment.getId()); + log.debug("Segment[%s] has become immutable.", segment.getId()); + } } } if (!tables.containsKey(segment.getDataSource())) { @@ -455,20 +443,26 @@ void removeServerSegment(final DruidServerMetadata server, final DataSegment seg { synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = FluentIterable - .from(segmentServers) - .filter(Predicates.not(Predicates.equalTo(server))) - .toSet(); - - final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - knownSegments.put(segment.getId(), metadataWithNumReplicas); + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it might no longer be broadcast or something + dataSourcesNeedingRebuild.add(segment.getDataSource()); + } else { + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = FluentIterable + .from(segmentServers) + .filter(Predicates.not(Predicates.equalTo(server))) + .toSet(); + + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + knownSegments.put(segment.getId(), metadataWithNumReplicas); + } lock.notifyAll(); } } @@ -631,13 +625,14 @@ protected DruidTable buildDruidTable(final String dataSource) final RowSignature.Builder builder = RowSignature.builder(); columnTypes.forEach(builder::add); - if (broadcastDatasources.contains(dataSource)) { - return new DruidTable( - new GlobalTableDataSource(dataSource), - builder.build() - ); + + final TableDataSource tableDataSource; + if (segmentManager.getDataSourceNames().contains(dataSource)) { + tableDataSource = new GlobalTableDataSource(dataSource); + } else { + tableDataSource = new TableDataSource(dataSource); } - return new DruidTable(new TableDataSource(dataSource), builder.build()); + return new DruidTable(tableDataSource, builder.build()); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 5bf2f981632d..8455965ef5e9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -29,7 +29,6 @@ import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -109,6 +108,7 @@ public Period getMetadataRefreshPeriod() private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private TestServerInventoryView serverView; private List druidServers; private CountDownLatch getDatasourcesLatch = new CountDownLatch(1); private CountDownLatch buildTableLatch = new CountDownLatch(1); @@ -219,7 +219,7 @@ public Set getDataSourceNames() PruneSpecsHolder.DEFAULT ); final List realtimeSegments = ImmutableList.of(segment1); - final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); + serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); schema = new DruidSchema( @@ -468,7 +468,22 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + final DataSegment someNewBrokerSegment = new DataSegment( + "foo", + Intervals.of("2012/2013"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + new NumberedShardSpec(2, 3), + null, + 1, + 100L, + PruneSpecsHolder.DEFAULT + ); dataSourceNames.add("foo"); + serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); + // wait for build buildTableLatch.await(1, TimeUnit.SECONDS); buildTableLatch = new CountDownLatch(1); @@ -482,6 +497,24 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + + // now remove it + dataSourceNames.remove("foo"); + serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); + + // wait for build + buildTableLatch.await(1, TimeUnit.SECONDS); + buildTableLatch = new CountDownLatch(1); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index a8e498beb1f0..170d205f449d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -63,18 +64,31 @@ public class TestServerInventoryView implements TimelineServerView "dummy", 0 ); - private final List segments; + private static final DruidServerMetadata DUMMY_BROKER = new DruidServerMetadata( + "dummy3", + "dummy3", + null, + 0, + ServerType.BROKER, + "dummy", + 0 + ); + private List segments = new ArrayList<>(); private List realtimeSegments = new ArrayList<>(); + private List brokerSegments = new ArrayList<>(); + + private List> segmentCallbackExecs = new ArrayList<>(); + private List> timelineCallbackExecs = new ArrayList<>(); public TestServerInventoryView(List segments) { - this.segments = ImmutableList.copyOf(segments); + this.segments.addAll(segments); } public TestServerInventoryView(List segments, List realtimeSegments) { - this.segments = ImmutableList.copyOf(segments); - this.realtimeSegments = ImmutableList.copyOf(realtimeSegments); + this.segments.addAll(segments); + this.realtimeSegments.addAll(realtimeSegments); } @Override @@ -87,6 +101,7 @@ public Optional> getTimeline(Da @Override public List getDruidServers() { + // do not return broker on purpose to mimic behavior of BrokerServerView final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); final ImmutableDruidServer server = new ImmutableDruidServer( DUMMY_SERVER, @@ -118,6 +133,7 @@ public void registerSegmentCallback(Executor exec, final SegmentCallback callbac exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); } exec.execute(callback::segmentViewInitialized); + segmentCallbackExecs.add(new Pair<>(exec, callback)); } @Override @@ -130,6 +146,7 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); } exec.execute(callback::timelineInitialized); + timelineCallbackExecs.add(new Pair<>(exec, callback)); } @Override @@ -143,4 +160,57 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c { // Do nothing } + + public void addSegment(DataSegment segment, ServerType serverType) + { + final Pair> whichServerAndSegments = + getDummyServerAndSegmentsForType(serverType); + final DruidServerMetadata whichServer = whichServerAndSegments.lhs; + whichServerAndSegments.rhs.add(segment); + segmentCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment)) + ); + timelineCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment)) + ); + } + + public void removeSegment(DataSegment segment, ServerType serverType) + { + final Pair> whichServerAndSegments = + getDummyServerAndSegmentsForType(serverType); + final DruidServerMetadata whichServer = whichServerAndSegments.lhs; + whichServerAndSegments.rhs.remove(segment); + segmentCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentRemoved(whichServer, segment)) + ); + timelineCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> { + execAndCallback.rhs.serverSegmentRemoved(whichServer, segment); + // assume that all replicas have been removed and fire this one too + execAndCallback.rhs.segmentRemoved(segment); + }) + ); + } + + private Pair> getDummyServerAndSegmentsForType(ServerType serverType) + { + final DruidServerMetadata whichServer; + final List whichSegments; + switch (serverType) { + case BROKER: + whichServer = DUMMY_BROKER; + whichSegments = brokerSegments; + break; + case REALTIME: + whichServer = DUMMY_SERVER_REALTIME; + whichSegments = realtimeSegments; + break; + default: + whichServer = DUMMY_SERVER; + whichSegments = segments; + break; + } + return new Pair<>(whichServer, whichSegments); + } } From 5f0e4c2096540f70f3235567ae9e3c8399f61d57 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Jun 2020 14:46:19 -0700 Subject: [PATCH 7/7] use generated equals and hashcode --- .../apache/druid/query/TableDataSource.java | 22 +++++-------------- .../druid/query/TableDataSourceTest.java | 2 +- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index 7199f2c5a6ae..469d5be21719 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; @JsonTypeName("table") @@ -99,32 +100,21 @@ public String toString() } @Override - public final boolean equals(Object o) + public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof TableDataSource)) { + if (o == null || getClass() != o.getClass()) { return false; } - - if ((o instanceof GlobalTableDataSource || this instanceof GlobalTableDataSource) && - !getClass().equals(o.getClass())) { - return false; - } - TableDataSource that = (TableDataSource) o; - - if (!name.equals(that.name)) { - return false; - } - - return true; + return name.equals(that.name); } @Override - public final int hashCode() + public int hashCode() { - return name.hashCode(); + return Objects.hash(name); } } diff --git a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java index ec8d27d29029..b5aeeb11bab4 100644 --- a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java @@ -86,7 +86,7 @@ public void test_withChildren_nonEmpty() @Test public void test_equals() { - EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify(); + EqualsVerifier.forClass(TableDataSource.class).usingGetClass().withNonnullFields("name").verify(); } @Test