From b0c8d7e8bd43d70663389812e6ae40488f72dc93 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 26 Nov 2015 23:52:20 +0530 Subject: [PATCH 1/2] Remove ServerView from RealtimeIndexTasks and use coordinator http endpoint for handoffs - fixes #1970 - extracted out segment handoff callbacks in SegmentHandoffNotifier which is responsible for tracking segment handoffs and doing callbacks when handoff is complete. - Coordinator now maintains a view of segments in the cluster, this will affect the jam heap requirements for the overlord for large clusters. realtime index task and nodes now use HTTP end points exposed by the coordinator to get serverView review comment fix realtime node guide injection review comments make test not rely on scheduled exec fix compilation fix import review comment introduce immutableSegmentLoadInfo fix son reading remove unnecessary logging --- .../io/druid/timeline/TimelineLookup.java | 11 + .../timeline/VersionedIntervalTimeline.java | 12 + docs/content/configuration/index.md | 10 + .../configuration/production-cluster.md | 2 + .../io/druid/indexing/common/TaskToolbox.java | 19 +- .../indexing/common/TaskToolboxFactory.java | 14 +- .../common/task/RealtimeIndexTask.java | 2 +- .../indexing/common/TaskToolboxTest.java | 17 +- .../indexing/common/task/IndexTaskTest.java | 27 +- .../common/task/RealtimeIndexTaskTest.java | 60 ++- .../IngestSegmentFirehoseFactoryTest.java | 9 +- ...estSegmentFirehoseFactoryTimelineTest.java | 8 +- .../indexing/overlord/TaskLifecycleTest.java | 72 ++-- .../worker/WorkerTaskMonitorTest.java | 13 +- .../druid/client/CoordinatorServerView.java | 209 +++++++++ .../client/ImmutableSegmentLoadInfo.java | 96 +++++ .../java/io/druid/client/SegmentLoadInfo.java | 105 +++++ .../druid/client/coordinator/Coordinator.java | 36 ++ .../client/coordinator/CoordinatorClient.java | 125 ++++++ .../CoordinatorSelectorConfig.java | 37 ++ .../guice/CoordinatorDiscoveryModule.java | 50 +++ .../druid/initialization/Initialization.java | 2 + ...oordinatorBasedSegmentHandoffNotifier.java | 172 ++++++++ ...atorBasedSegmentHandoffNotifierConfig.java | 35 ++ ...torBasedSegmentHandoffNotifierFactory.java | 50 +++ .../realtime/plumber/RealtimePlumber.java | 300 ++++++------- .../plumber/RealtimePlumberSchool.java | 13 +- .../plumber/SegmentHandoffNotifier.java | 52 +++ .../SegmentHandoffNotifierFactory.java | 26 ++ .../server/http/DatasourcesResource.java | 67 ++- .../client/CoordinatorServerViewTest.java | 395 ++++++++++++++++++ .../client/ImmutableSegmentLoadInfoTest.java | 66 +++ ...inatorBasedSegmentHandoffNotifierTest.java | 357 ++++++++++++++++ .../plumber/RealtimePlumberSchoolTest.java | 80 ++-- .../server/http/DatasourcesResourceTest.java | 5 +- .../java/io/druid/cli/CliCoordinator.java | 5 + .../src/main/java/io/druid/cli/CliPeon.java | 16 +- .../java/io/druid/guice/RealtimeModule.java | 16 +- 38 files changed, 2287 insertions(+), 304 deletions(-) create mode 100644 server/src/main/java/io/druid/client/CoordinatorServerView.java create mode 100644 server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java create mode 100644 server/src/main/java/io/druid/client/SegmentLoadInfo.java create mode 100644 server/src/main/java/io/druid/client/coordinator/Coordinator.java create mode 100644 server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java create mode 100644 server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java create mode 100644 server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java create mode 100644 server/src/test/java/io/druid/client/CoordinatorServerViewTest.java create mode 100644 server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java create mode 100644 server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java diff --git a/common/src/main/java/io/druid/timeline/TimelineLookup.java b/common/src/main/java/io/druid/timeline/TimelineLookup.java index 89810376fa21..3ebbdfbd8f07 100644 --- a/common/src/main/java/io/druid/timeline/TimelineLookup.java +++ b/common/src/main/java/io/druid/timeline/TimelineLookup.java @@ -35,6 +35,17 @@ public interface TimelineLookup */ public Iterable> lookup(Interval interval); + /** + * Does a lookup for the objects representing the given time interval. Will also return + * incomplete PartitionHolders. + * + * @param interval interval to find objects for + * + * @return Holders representing the interval that the objects exist for, PartitionHolders + * can be incomplete. Holders returned sorted by the interval. + */ + public Iterable> lookupWithIncompletePartitions(Interval interval); + public PartitionHolder findEntry(Interval interval, VersionType version); } diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index eab26dcee184..0369cded5bd9 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -191,6 +191,18 @@ public List> lookup(Interval inter } } + @Override + public Iterable> lookupWithIncompletePartitions(Interval interval) + { + try { + lock.readLock().lock(); + return lookup(interval, true); + } + finally { + lock.readLock().unlock(); + } + } + public Set> findOvershadowed() { try { diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 06f7e2ac77c1..efaf2f3b63c5 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -258,6 +258,16 @@ This config is used to find the [Indexing Service](../design/indexing-service.ht |--------|-----------|-------| |`druid.selectors.indexing.serviceName`|The druid.service name of the indexing service Overlord node. To start the Overlord with a different name, set it with this property. |druid/overlord| + +### Coordinator Discovery + +This config is used to find the [Coordinator](../design/coordinator.html) using Curator service discovery. This config is used by the realtime indexing nodes to get information about the segments loaded in the cluster. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.selectors.coordinator.serviceName`|The druid.service name of the coordinator node. To start the Coordinator with a different name, set it with this property. |druid/coordinator| + + ### Announcing Segments You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs. diff --git a/docs/content/configuration/production-cluster.md b/docs/content/configuration/production-cluster.md index 79c690f5fdf0..ab2a5c489c58 100644 --- a/docs/content/configuration/production-cluster.md +++ b/docs/content/configuration/production-cluster.md @@ -63,6 +63,8 @@ druid.cache.readBufferSize=10485760 # Indexing Service Service Discovery druid.selectors.indexing.serviceName=druid:prod:overlord +# Coordinator Service Discovery +druid.selectors.coordinator.serviceName=druid:prod:coordinator ``` ### Overlord Node diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index fedec3a700df..7a619a9decf1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -40,6 +40,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -58,14 +59,14 @@ public class TaskToolbox { private final TaskConfig config; private final Task task; - private final TaskActionClientFactory taskActionClientFactory; + private final TaskActionClient taskActionClient; private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; private final ExecutorService queryExecutorService; @@ -78,14 +79,14 @@ public class TaskToolbox public TaskToolbox( TaskConfig config, Task task, - TaskActionClientFactory taskActionClientFactory, + TaskActionClient taskActionClient, ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, + SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -98,14 +99,14 @@ public TaskToolbox( { this.config = config; this.task = task; - this.taskActionClientFactory = taskActionClientFactory; + this.taskActionClient = taskActionClient; this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -123,7 +124,7 @@ public TaskConfig getConfig() public TaskActionClient getTaskActionClient() { - return taskActionClientFactory.create(task); + return taskActionClient; } public ServiceEmitter getEmitter() @@ -156,9 +157,9 @@ public DataSegmentAnnouncer getSegmentAnnouncer() return segmentAnnouncer; } - public FilteredServerView getNewSegmentServerView() + public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory() { - return newSegmentServerView; + return handoffNotifierFactory; } public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 16628ef345a5..ee62b3b89546 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -21,10 +21,10 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -33,6 +33,7 @@ import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import java.io.File; @@ -51,7 +52,7 @@ public class TaskToolboxFactory private final DataSegmentMover dataSegmentMover; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; @@ -70,7 +71,7 @@ public TaskToolboxFactory( DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, + SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -88,7 +89,7 @@ public TaskToolboxFactory( this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -101,18 +102,17 @@ public TaskToolboxFactory( public TaskToolbox build(Task task) { final File taskWorkDir = config.getTaskWorkDir(task.getId()); - return new TaskToolbox( config, task, - taskActionClientFactory, + taskActionClientFactory.create(task), emitter, segmentPusher, dataSegmentKiller, dataSegmentMover, dataSegmentArchiver, segmentAnnouncer, - newSegmentServerView, + handoffNotifierFactory, queryRunnerFactoryConglomerate, queryExecutorService, monitorScheduler, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 280acd34ed55..c3aa6f480c8b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -285,7 +285,7 @@ public String getVersion(final Interval interval) toolbox.getSegmentPusher(), lockingSegmentAnnouncer, segmentPublisher, - toolbox.getNewSegmentServerView(), + toolbox.getSegmentHandoffNotifierFactory(), toolbox.getQueryExecutorService(), toolbox.getCache(), toolbox.getCacheConfig(), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index ab46bd8ec44a..da4c06d0a114 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -37,6 +36,7 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; @@ -64,7 +64,9 @@ public class TaskToolboxTest private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class); private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class); private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class); - private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class); + private SegmentHandoffNotifierFactory mockHandoffNotifierFactory = EasyMock.createNiceMock( + SegmentHandoffNotifierFactory.class + ); private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class); private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); @@ -82,7 +84,8 @@ public class TaskToolboxTest public void setUp() throws IOException { EasyMock.expect(task.getId()).andReturn("task_id").anyTimes(); - EasyMock.replay(task); + EasyMock.expect(task.getDataSource()).andReturn("task_ds").anyTimes(); + EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), @@ -93,7 +96,7 @@ public void setUp() throws IOException mockDataSegmentMover, mockDataSegmentArchiver, mockSegmentAnnouncer, - mockNewSegmentServerView, + mockHandoffNotifierFactory, mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, @@ -116,12 +119,6 @@ public void testGetSegmentAnnouncer() Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer()); } - @Test - public void testGetNewSegmentServerView() - { - Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView()); - } - @Test public void testGetQueryRunnerFactoryConglomerate() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index d25b3fab60b5..be0e74f4d723 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -208,26 +208,19 @@ private final List runTask(final IndexTask indexTask) throws Except indexTask.run( new TaskToolbox( - null, null, new TaskActionClientFactory() + null, null, new TaskActionClient() { @Override - public TaskActionClient create(Task task) + public RetType submit(TaskAction taskAction) throws IOException { - return new TaskActionClient() - { - @Override - public RetType submit(TaskAction taskAction) throws IOException - { - if (taskAction instanceof LockListAction) { - return (RetType) Arrays.asList( - new TaskLock( - "", "", null, new DateTime().toString() - ) - ); - } - return null; - } - }; + if (taskAction instanceof LockListAction) { + return (RetType) Arrays.asList( + new TaskLock( + "", "", null, new DateTime().toString() + ) + ); + } + return null; } }, null, new DataSegmentPusher() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 49b62dcd4513..a5399e076331 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; @@ -60,7 +62,6 @@ import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; -import io.druid.indexing.test.TestServerView; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; @@ -72,6 +73,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -90,9 +92,10 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.metrics.EventReceiverFirehoseRegister; -import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -107,7 +110,9 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; public class RealtimeIndexTaskTest { @@ -136,6 +141,8 @@ public class RealtimeIndexTaskTest private DateTime now; private ListeningExecutorService taskExec; + private Map> handOffCallbacks; + private SegmentHandoffNotifierFactory handoffNotifierFactory; @Before public void setUp() @@ -204,9 +211,10 @@ public void testBasics() throws Exception Assert.assertEquals(2, countEvents(task)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -294,9 +302,10 @@ public void testRestore() throws Exception Assert.assertEquals(2, countEvents(task2)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -489,6 +498,43 @@ public void registerQuery(Query query, ListenableFuture future) ) ) ); + + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() + { + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new SegmentHandoffNotifier() + { + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; + } + }; final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( taskConfig, taskActionClientFactory, @@ -498,7 +544,7 @@ public void registerQuery(Query query, ListenableFuture future) null, // DataSegmentMover null, // DataSegmentArchiver new TestDataSegmentAnnouncer(), - new TestServerView(), + handoffNotifierFactory, conglomerate, MoreExecutors.sameThreadExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index fe418f9ca844..26b90592a6a8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -75,8 +75,10 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.AfterClass; import org.junit.Assert; @@ -180,6 +182,9 @@ public void deleteSegments(Set segments) new TaskActionToolbox(tl, mdc, newMockEmitter()) ); final ObjectMapper objectMapper = newObjectMapper(); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(notifierFactory); + final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), tac, @@ -230,7 +235,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException } }, null, // segment announcer - null, // new segment server view + notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler @@ -500,5 +505,7 @@ public void emit(ServiceEventBuilder builder) } }; + + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index d49910b68704..204224722d2c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -60,10 +60,12 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import org.apache.commons.io.FileUtils; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -281,6 +283,8 @@ public RetType submit(TaskAction taskAction) throws IOExcepti } } }; + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), new TaskActionClientFactory() @@ -296,8 +300,8 @@ public TaskActionClient create(Task task) null, // segment killer null, // segment mover null, // segment archiver - null, // segment announcer - null, // new segment server view + null, // segment announcer, + notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 843500682ede..4f8aeb058770 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -25,16 +25,17 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Event; @@ -42,8 +43,6 @@ import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -60,6 +59,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; @@ -68,7 +68,6 @@ import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; -import io.druid.indexing.common.task.RealtimeIndexTaskTest; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; @@ -77,6 +76,7 @@ import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -95,8 +95,9 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentTest; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -115,7 +116,6 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -184,7 +184,6 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private TaskToolboxFactory tb = null; private IndexSpec indexSpec; private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; - private FilteredServerView serverView; private MonitorScheduler monitorScheduler; private ServiceEmitter emitter; private TaskQueueConfig tqc; @@ -192,7 +191,9 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private int announcedSinks; private static CountDownLatch publishCountDown; private TestDerbyConnector testDerbyConnector; - private List segmentCallbacks = new ArrayList<>(); + private SegmentHandoffNotifierFactory handoffNotifierFactory; + private Map> handOffCallbacks; + private static TestIndexerMetadataStorageCoordinator newMockMDC() { @@ -380,15 +381,42 @@ public void setUp() throws Exception } else { throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); } - - serverView = new FilteredServerView() + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() { @Override - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ) + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { - segmentCallbacks.add(callback); + return new SegmentHandoffNotifier() + { + + + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; } }; setUpAndStartTaskQueue( @@ -472,7 +500,7 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } }, // segment announcer - serverView, // new segment server view + handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective null, // query executor service monitorScheduler, // monitor scheduler @@ -840,16 +868,10 @@ public void testRealtimeIndexTask() throws Exception Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS)); // Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status - segmentCallbacks.get(0).segmentAdded( - new DruidServerMetadata( - "dummy", - "dummy_host", - 0, - "historical", - "dummy_tier", - 0 - ), mdc.getPublished().iterator().next() - ); + Assert.assertEquals(1, handOffCallbacks.size()); + Pair executorRunnablePair = Iterables.getOnlyElement(handOffCallbacks.values()); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + handOffCallbacks.clear(); // Wait for realtime index task to handle callback in plumber and succeed while (tsqa.getStatus(taskId).get().isRunnable()) { diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 76208d8ab729..744a908159fa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -30,7 +30,10 @@ import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; @@ -38,6 +41,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -45,6 +49,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; +import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -125,6 +130,11 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); + TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); + EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes(); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory); return new WorkerTaskMonitor( jsonMapper, cf, @@ -132,7 +142,8 @@ private WorkerTaskMonitor createTaskMonitor() new ThreadPoolTaskRunner( new TaskToolboxFactory( taskConfig, - null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + taskActionClientFactory, + null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java new file mode 100644 index 000000000000..e61d7a309dc3 --- /dev/null +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -0,0 +1,209 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; +import io.druid.query.DataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * ServerView of coordinator for the state of segments being loaded in the cluster. + */ +public class CoordinatorServerView implements InventoryView +{ + private static final Logger log = new Logger(CoordinatorServerView.class); + + private final Object lock = new Object(); + + private final Map segmentLoadInfos; + private final Map> timelines; + + private final ServerInventoryView baseView; + + private volatile boolean initialized = false; + + @Inject + public CoordinatorServerView( + ServerInventoryView baseView + ) + { + this.baseView = baseView; + this.segmentLoadInfos = Maps.newHashMap(); + this.timelines = Maps.newHashMap(); + + ExecutorService exec = Execs.singleThreaded("CoordinatorServerView-%s"); + baseView.registerSegmentCallback( + exec, + new ServerView.SegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + serverAddedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment) + { + serverRemovedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + initialized = true; + return ServerView.CallbackAction.CONTINUE; + } + } + ); + + baseView.registerServerCallback( + exec, + new ServerView.ServerCallback() + { + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + removeServer(server); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } + + public boolean isInitialized() + { + return initialized; + } + + public void clear() + { + synchronized (lock) { + timelines.clear(); + segmentLoadInfos.clear(); + } + } + + private void removeServer(DruidServer server) + { + for (DataSegment segment : server.getSegments().values()) { + serverRemovedSegment(server.getMetadata(), segment); + } + } + + private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) + { + String segmentId = segment.getIdentifier(); + synchronized (lock) { + log.debug("Adding segment[%s] for server[%s]", segment, server); + + SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId); + if (segmentLoadInfo == null) { + // servers escape the scope of this object so use ConcurrentSet + segmentLoadInfo = new SegmentLoadInfo(segment); + + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + timelines.put(segment.getDataSource(), timeline); + } + + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segmentLoadInfo) + ); + segmentLoadInfos.put(segmentId, segmentLoadInfo); + } + segmentLoadInfo.addServer(server); + } + } + + private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) + { + String segmentId = segment.getIdentifier(); + + + synchronized (lock) { + log.debug("Removing segment[%s] from server[%s].", segmentId, server); + + final SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId); + if (segmentLoadInfo == null) { + log.warn("Told to remove non-existant segment[%s]", segmentId); + return; + } + segmentLoadInfo.removeServer(server); + if (segmentLoadInfo.isEmpty()) { + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + segmentLoadInfos.remove(segmentId); + + final PartitionChunk removedPartition = timeline.remove( + segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk( + new SegmentLoadInfo( + segment + ) + ) + ); + + if (removedPartition == null) { + log.warn( + "Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", + segment.getInterval(), + segment.getVersion() + ); + } + } + } + } + + public VersionedIntervalTimeline getTimeline(DataSource dataSource) + { + String table = Iterables.getOnlyElement(dataSource.getNames()); + synchronized (lock) { + return timelines.get(table); + } + } + + + @Override + public DruidServer getInventoryValue(String string) + { + return baseView.getInventoryValue(string); + } + + @Override + public Iterable getInventory() + { + return baseView.getInventory(); + } +} diff --git a/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java b/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java new file mode 100644 index 000000000000..0eebd6766c7e --- /dev/null +++ b/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.Set; + +public class ImmutableSegmentLoadInfo +{ + private final DataSegment segment; + private final ImmutableSet servers; + + @JsonCreator + public ImmutableSegmentLoadInfo( + @JsonProperty("segment") DataSegment segment, + @JsonProperty("servers") Set servers + ) + { + Preconditions.checkNotNull(segment, "segment"); + Preconditions.checkNotNull(servers, "servers"); + this.segment = segment; + this.servers = ImmutableSet.copyOf(servers); + } + + @JsonProperty("segment") + public DataSegment getSegment() + { + return segment; + } + + @JsonProperty("servers") + public ImmutableSet getServers() + { + return servers; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ImmutableSegmentLoadInfo that = (ImmutableSegmentLoadInfo) o; + + if (!segment.equals(that.segment)) { + return false; + } + return servers.equals(that.servers); + + } + + @Override + public int hashCode() + { + int result = segment.hashCode(); + result = 31 * result + servers.hashCode(); + return result; + } + + @Override + public String toString() + { + return "SegmentLoadInfo{" + + "segment=" + segment + + ", servers=" + servers + + '}'; + } +} diff --git a/server/src/main/java/io/druid/client/SegmentLoadInfo.java b/server/src/main/java/io/druid/client/SegmentLoadInfo.java new file mode 100644 index 000000000000..6dac43733d9c --- /dev/null +++ b/server/src/main/java/io/druid/client/SegmentLoadInfo.java @@ -0,0 +1,105 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Set; + +public class SegmentLoadInfo +{ + private final DataSegment segment; + private final Set servers; + + public SegmentLoadInfo(DataSegment segment) + { + Preconditions.checkNotNull(segment, "segment"); + this.segment = segment; + this.servers = Sets.newConcurrentHashSet(); + } + + public DataSegment getSegment() + { + return segment; + } + + public boolean addServer(DruidServerMetadata server) + { + return servers.add(server); + } + + public boolean removeServer(DruidServerMetadata server) + { + return servers.remove(server); + } + + public boolean isEmpty() + { + return servers.isEmpty(); + } + + public ImmutableSegmentLoadInfo toImmutableSegmentLoadInfo() + { + return new ImmutableSegmentLoadInfo(segment, servers); + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SegmentLoadInfo that = (SegmentLoadInfo) o; + + if (!segment.equals(that.segment)) { + return false; + } + return servers.equals(that.servers); + + } + + @Override + public int hashCode() + { + int result = segment.hashCode(); + result = 31 * result + servers.hashCode(); + return result; + } + + @Override + public String toString() + { + return "SegmentLoadInfo{" + + "segment=" + segment + + ", servers=" + servers + + '}'; + } +} diff --git a/server/src/main/java/io/druid/client/coordinator/Coordinator.java b/server/src/main/java/io/druid/client/coordinator/Coordinator.java new file mode 100644 index 000000000000..3577b73854a2 --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/Coordinator.java @@ -0,0 +1,36 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Coordinator +{ +} diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java new file mode 100644 index 000000000000..0cd6f4e139ed --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -0,0 +1,125 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.annotations.Global; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; + +import java.net.URI; +import java.net.URL; +import java.util.List; + +public class CoordinatorClient +{ + private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); + + private final HttpClient client; + private final ObjectMapper jsonMapper; + private final ServerDiscoverySelector selector; + + @Inject + public CoordinatorClient( + @Global HttpClient client, + ObjectMapper jsonMapper, + @Coordinator ServerDiscoverySelector selector + ) + { + this.client = client; + this.jsonMapper = jsonMapper; + this.selector = selector; + } + + + public List fetchServerView(String dataSource, Interval interval, boolean incompleteOk) + { + try { + StatusResponseHolder response = client.go( + new Request( + HttpMethod.GET, + new URL( + String.format( + "%s/datasources/%s/intervals/%s/serverview?partial=%s", + baseUrl(), + dataSource, + interval.toString().replace("/", "_"), + incompleteOk + ) + ) + ), + RESPONSE_HANDLER + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching serverView status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + + } + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + + private String baseUrl() + { + try { + final Server instance = selector.pick(); + if (instance == null) { + throw new ISE("Cannot find instance of coordinator"); + } + + return new URI( + instance.getScheme(), + null, + instance.getAddress(), + instance.getPort(), + "/druid/coordinator/v1", + null, + null + ).toString(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java new file mode 100644 index 000000000000..5198c0576d94 --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java @@ -0,0 +1,37 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class CoordinatorSelectorConfig +{ + public static final String DEFAULT_SERVICE_NAME = "druid/coordinator"; + + @JsonProperty + private String serviceName = DEFAULT_SERVICE_NAME; + + public String getServiceName() + { + return serviceName; + } +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java new file mode 100644 index 000000000000..1d2e1a85b0c3 --- /dev/null +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import io.druid.client.coordinator.Coordinator; +import io.druid.client.coordinator.CoordinatorSelectorConfig; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; + +/** + */ +public class CoordinatorDiscoveryModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class); + } + + @Provides + @Coordinator + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( + CoordinatorSelectorConfig config, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + return serverDiscoveryFactory.createSelector(config.getServiceName()); + } +} diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 80056793d51f..cede278cfbf4 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -34,6 +34,7 @@ import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; +import io.druid.guice.CoordinatorDiscoveryModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; import io.druid.guice.ExtensionsConfig; @@ -391,6 +392,7 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter new DerbyMetadataStorageDruidModule(), new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), + new CoordinatorDiscoveryModule(), new LocalDataStorageDruidModule(), new FirehoseModule(), new ParsersModule() diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java new file mode 100644 index 000000000000..f27d8b8537ce --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -0,0 +1,172 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.client.coordinator.CoordinatorClient; +import io.druid.concurrent.Execs; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNotifier +{ + private static final Logger log = new Logger(CoordinatorBasedSegmentHandoffNotifier.class); + + private final ConcurrentMap> handOffCallbacks = Maps.newConcurrentMap(); + private final CoordinatorClient coordinatorClient; + private volatile ScheduledExecutorService scheduledExecutor; + private final long pollDurationMillis; + private final String dataSource; + + public CoordinatorBasedSegmentHandoffNotifier( + String dataSource, + CoordinatorClient coordinatorClient, + CoordinatorBasedSegmentHandoffNotifierConfig config + ) + { + this.dataSource = dataSource; + this.coordinatorClient = coordinatorClient; + this.pollDurationMillis = config.getPollDuration().getMillis(); + } + + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + log.info("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", dataSource, descriptor); + Pair prev = handOffCallbacks.putIfAbsent( + descriptor, + new Pair<>(exec, handOffRunnable) + ); + return prev == null; + } + + @Override + public void start() + { + scheduledExecutor = Execs.scheduledSingleThreaded("coordinator_handoff_scheduled_%d"); + scheduledExecutor.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + checkForSegmentHandoffs(); + } + }, 0L, pollDurationMillis, TimeUnit.MILLISECONDS + ); + } + + void checkForSegmentHandoffs() + { + try { + Iterator>> itr = handOffCallbacks.entrySet() + .iterator(); + while (itr.hasNext()) { + Map.Entry> entry = itr.next(); + SegmentDescriptor descriptor = entry.getKey(); + try { + List loadedSegments = coordinatorClient.fetchServerView( + dataSource, + descriptor.getInterval(), + true + ); + + if (isHandOffComplete(loadedSegments, entry.getKey())) { + log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor); + entry.getValue().lhs.execute(entry.getValue().rhs); + itr.remove(); + } + } + catch (Exception e) { + log.error( + e, + "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", + dataSource, + descriptor, + pollDurationMillis + ); + } + } + if (!handOffCallbacks.isEmpty()) { + log.info("Still waiting for Handoff for Segments : [%s]", handOffCallbacks.keySet()); + } + } + catch (Throwable t) { + log.error( + t, + "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", + dataSource, + pollDurationMillis + ); + } + } + + + static boolean isHandOffComplete(List serverView, SegmentDescriptor descriptor) + { + for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { + if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() + == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 + && Iterables.any( + segmentLoadInfo.getServers(), new Predicate() + { + @Override + public boolean apply(DruidServerMetadata input) + { + return input.isAssignable(); + } + } + )) { + return true; + } + } + return false; + } + + @Override + public void stop() + { + scheduledExecutor.shutdown(); + } + + // Used in tests + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java new file mode 100644 index 000000000000..ab08a3a7d1bb --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java @@ -0,0 +1,35 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +public class CoordinatorBasedSegmentHandoffNotifierConfig +{ + @JsonProperty + public Duration pollDuration = new Period("PT1M").toStandardDuration(); + + public Duration getPollDuration() + { + return pollDuration; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..3b96949754c4 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.inject.Inject; +import io.druid.client.coordinator.CoordinatorClient; + +public class CoordinatorBasedSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory +{ + + private final CoordinatorClient client; + private final CoordinatorBasedSegmentHandoffNotifierConfig config; + + @Inject + public CoordinatorBasedSegmentHandoffNotifierFactory( + CoordinatorClient client, + CoordinatorBasedSegmentHandoffNotifierConfig config + ) + { + this.client = client; + this.config = config; + } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new CoordinatorBasedSegmentHandoffNotifier( + dataSource, + client, + config + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index e5ba74fb938d..bb169fb5747c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -41,8 +40,6 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; @@ -77,7 +74,6 @@ import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -118,7 +114,7 @@ public class RealtimePlumber implements Plumber private final ExecutorService queryExecutorService; private final DataSegmentPusher dataSegmentPusher; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifier handoffNotifier; private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( @@ -152,7 +148,7 @@ public RealtimePlumber( ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - FilteredServerView serverView, + SegmentHandoffNotifier handoffNotifier, Cache cache, CacheConfig cacheConfig, ObjectMapper objectMapper @@ -168,12 +164,12 @@ public RealtimePlumber( this.queryExecutorService = queryExecutorService; this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifier = handoffNotifier; this.cache = cache; this.cacheConfig = cacheConfig; this.objectMapper = objectMapper; - if(!cache.isLocal()) { + if (!cache.isLocal()) { log.error("Configured cache is not local, caching will not be enabled"); } @@ -205,8 +201,8 @@ public Object startJob() { computeBaseDir(schema).mkdirs(); initializeExecutors(); + handoffNotifier.start(); Object retVal = bootstrapSinksFromDisk(); - registerServerViewCallback(); startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); @@ -251,17 +247,8 @@ private Sink getSink(long timestamp) ); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); + addSink(retVal); - try { - segmentAnnouncer.announceSegment(retVal.getSegment()); - sinks.put(truncatedTime, retVal); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); - } - catch (IOException e) { - log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) - .addData("interval", retVal.getInterval()) - .emit(); - } } return retVal; @@ -591,6 +578,17 @@ public void doRun() } } ); + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()), + mergeExecutor, new Runnable() + { + @Override + public void run() + { + abandonSegment(sink.getInterval().getStartMillis(), sink); + } + } + ); } @Override @@ -634,6 +632,7 @@ public String apply(Sink input) } } + handoffNotifier.stop(); shutdownExecutors(); stopped = true; @@ -672,11 +671,11 @@ protected void initializeExecutors() protected void shutdownExecutors() { - // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the - // ServerView sends it a new segment callback + // scheduledExecutor is shutdown here if (scheduledExecutor != null) { scheduledExecutor.shutdown(); persistExecutor.shutdown(); + mergeExecutor.shutdown(); } } @@ -697,7 +696,7 @@ protected Object bootstrapSinksFromDisk() Object metadata = null; long latestCommitTime = 0; for (File sinkDir : files) { - Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); + final Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); // To avoid reading and listing of "merged" dir @@ -729,18 +728,25 @@ public int compare(File o1, File o2) } ); boolean isCorrupted = false; - try { - List hydrants = Lists.newArrayList(); - for (File segmentDir : sinkFiles) { - log.info("Loading previously persisted segment at [%s]", segmentDir); - - // Although this has been tackled at start of this method. - // Just a doubly-check added to skip "merged" dir. from being added to hydrants - // If 100% sure that this is not needed, this check can be removed. - if (Ints.tryParse(segmentDir.getName()) == null) { - continue; - } - QueryableIndex queryableIndex = null; + List hydrants = Lists.newArrayList(); + for (File segmentDir : sinkFiles) { + log.info("Loading previously persisted segment at [%s]", segmentDir); + + // Although this has been tackled at start of this method. + // Just a doubly-check added to skip "merged" dir. from being added to hydrants + // If 100% sure that this is not needed, this check can be removed. + if (Ints.tryParse(segmentDir.getName()) == null) { + continue; + } + QueryableIndex queryableIndex = null; + try { + queryableIndex = IndexIO.loadIndex(segmentDir); + } + catch (IOException e) { + log.error(e, "Problem loading segmentDir from disk."); + isCorrupted = true; + } + if (isCorrupted) { try { queryableIndex = IndexIO.loadIndex(segmentDir); } @@ -748,78 +754,76 @@ public int compare(File o1, File o2) log.error(e, "Problem loading segmentDir from disk."); isCorrupted = true; } - if (isCorrupted) { - try { - File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); - log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); - FileUtils.copyDirectory(segmentDir, corruptSegmentDir); - FileUtils.deleteDirectory(segmentDir); - } - catch (Exception e1) { - log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); - } - //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed - //at some point. - continue; + catch (Exception e1) { + log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); } - Map segmentMetadata = queryableIndex.getMetaData(); - if (segmentMetadata != null) { - Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); - if (timestampObj != null) { - long timestamp = ((Long) timestampObj).longValue(); - if (timestamp > latestCommitTime) { - log.info( - "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", - queryableIndex.getMetaData(), timestamp, latestCommitTime - ); - latestCommitTime = timestamp; - metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); - } + //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed + //at some point. + continue; + } + Map segmentMetadata = queryableIndex.getMetaData(); + if (segmentMetadata != null) { + Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); + if (timestampObj != null) { + long timestamp = ((Long) timestampObj).longValue(); + if (timestamp > latestCommitTime) { + log.info( + "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", + queryableIndex.getMetaData(), timestamp, latestCommitTime + ); + latestCommitTime = timestamp; + metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); } } - hydrants.add( - new FireHydrant( - new QueryableIndexSegment( - DataSegment.makeDataSegmentIdentifier( - schema.getDataSource(), - sinkInterval.getStart(), - sinkInterval.getEnd(), - versioningPolicy.getVersion(sinkInterval), - config.getShardSpec() - ), - queryableIndex - ), - Integer.parseInt(segmentDir.getName()) - ) - ); } - if (hydrants.isEmpty()) { - // Probably encountered a corrupt sink directory - log.warn( - "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", - sinkDir.getAbsolutePath() - ); - continue; - } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); - sinks.put(sinkInterval.getStartMillis(), currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - new SingleElementPartitionChunk(currSink) + hydrants.add( + new FireHydrant( + new QueryableIndexSegment( + DataSegment.makeDataSegmentIdentifier( + schema.getDataSource(), + sinkInterval.getStart(), + sinkInterval.getEnd(), + versioningPolicy.getVersion(sinkInterval), + config.getShardSpec() + ), + queryableIndex + ), + Integer.parseInt(segmentDir.getName()) + ) ); - - segmentAnnouncer.announceSegment(currSink.getSegment()); } - catch (IOException e) { - log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) - .addData("interval", sinkInterval) - .emit(); + if (hydrants.isEmpty()) { + // Probably encountered a corrupt sink directory + log.warn( + "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", + sinkDir.getAbsolutePath() + ); + continue; } + final Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); + addSink(currSink); } return metadata; } + private void addSink(final Sink sink) + { + sinks.put(sink.getInterval().getStartMillis(), sink); + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk(sink) + ); + try { + segmentAnnouncer.announceSegment(sink.getSegment()); + } + catch (IOException e) { + log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + protected void startPersistThread() { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); @@ -933,28 +937,30 @@ private void mergeAndPush() */ protected void abandonSegment(final long truncatedTime, final Sink sink) { - try { - segmentAnnouncer.unannounceSegment(sink.getSegment()); - removeSegment(sink, computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); - sinks.remove(truncatedTime); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - new SingleElementPartitionChunk<>(sink) - ); - for (FireHydrant hydrant : sink) { - cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + if (sinks.containsKey(truncatedTime)) { + try { + segmentAnnouncer.unannounceSegment(sink.getSegment()); + removeSegment(sink, computePersistDir(schema, sink.getInterval())); + log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); + sinks.remove(truncatedTime); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk<>(sink) + ); + for (FireHydrant hydrant : sink) { + cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + } + synchronized (handoffCondition) { + handoffCondition.notifyAll(); + } } - synchronized (handoffCondition) { - handoffCondition.notifyAll(); + catch (Exception e) { + log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); } } - catch (Exception e) { - log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .emit(); - } } protected File computeBaseDir(DataSchema schema) @@ -1048,72 +1054,6 @@ protected int persistHydrant( } } - private void registerServerViewCallback() - { - serverView.registerSegmentCallback( - mergeExecutor, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) - { - if (stopped) { - log.info("Unregistering ServerViewCallback"); - mergeExecutor.shutdown(); - return ServerView.CallbackAction.UNREGISTER; - } - - if (!server.isAssignable()) { - return ServerView.CallbackAction.CONTINUE; - } - - log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - ) { - final Interval interval = segment.getInterval(); - for (Map.Entry entry : sinks.entrySet()) { - final Long sinkKey = entry.getKey(); - if (interval.contains(sinkKey)) { - final Sink sink = entry.getValue(); - log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server); - - final String segmentVersion = segment.getVersion(); - final String sinkVersion = sink.getSegment().getVersion(); - if (segmentVersion.compareTo(sinkVersion) >= 0) { - log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - abandonSegment(sinkKey, sink); - } - } - } - } - - return ServerView.CallbackAction.CONTINUE; - } - }, - new Predicate() - { - @Override - public boolean apply(final DataSegment segment) - { - return - schema.getDataSource().equalsIgnoreCase(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - && Iterables.any( - sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } - ); - } - } - ); - } - private void removeSegment(final Sink sink, final File target) { if (target.exists()) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 68113727929d..34dcf9fd67b0 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; @@ -45,7 +44,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final DataSegmentPusher dataSegmentPusher; private final DataSegmentAnnouncer segmentAnnouncer; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final ExecutorService queryExecutorService; private final Cache cache; private final CacheConfig cacheConfig; @@ -58,19 +57,19 @@ public RealtimePlumberSchool( @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject SegmentPublisher segmentPublisher, - @JacksonInject FilteredServerView serverView, + @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, @JacksonInject @Processing ExecutorService executorService, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, @JacksonInject ObjectMapper objectMapper - ) + ) { this.emitter = emitter; this.conglomerate = conglomerate; this.dataSegmentPusher = dataSegmentPusher; this.segmentAnnouncer = segmentAnnouncer; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryExecutorService = executorService; this.cache = cache; @@ -97,7 +96,7 @@ public Plumber findPlumber( queryExecutorService, dataSegmentPusher, segmentPublisher, - serverView, + handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), cache, cacheConfig, objectMapper @@ -110,7 +109,7 @@ private void verifyState() Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action."); Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action."); Preconditions.checkNotNull(segmentPublisher, "must specify a segmentPublisher to do this action."); - Preconditions.checkNotNull(serverView, "must specify a serverView to do this action."); + Preconditions.checkNotNull(handoffNotifierFactory, "must specify a handoffNotifierFactory to do this action."); Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java new file mode 100644 index 000000000000..fbccba57bcdc --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java @@ -0,0 +1,52 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import io.druid.query.SegmentDescriptor; + +import java.util.concurrent.Executor; + +public interface SegmentHandoffNotifier +{ + /** + * register a handOffCallback to be called when segment handoff is complete. + * + * @param descriptor segment descriptor for the segment for which handoffCallback is requested + * @param exec executor used to call the runnable + * @param handOffRunnable runnable to be called when segment handoff is complete + */ + boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, + Executor exec, + Runnable handOffRunnable + ); + + /** + * Perform any initial setup. Should be called before using any other methods, and should be paired + * with a corresponding call to {@link #stop()}. + */ + void start(); + + /** + * Perform any final processing and clean up after ourselves. + */ + void stop(); + +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..454487770ed0 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java @@ -0,0 +1,26 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + + +public interface SegmentHandoffNotifierFactory +{ + SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource); +} diff --git a/server/src/main/java/io/druid/server/http/DatasourcesResource.java b/server/src/main/java/io/druid/server/http/DatasourcesResource.java index e77b240a327b..ba494201fab8 100644 --- a/server/src/main/java/io/druid/server/http/DatasourcesResource.java +++ b/server/src/main/java/io/druid/server/http/DatasourcesResource.java @@ -27,12 +27,20 @@ import com.metamx.common.MapUtils; import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.common.logger.Logger; +import io.druid.client.CoordinatorServerView; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; -import io.druid.client.InventoryView; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; import io.druid.client.indexing.IndexingServiceClient; import io.druid.metadata.MetadataSegmentManager; +import io.druid.query.TableDataSource; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.PartitionChunk; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -58,13 +66,15 @@ @Path("/druid/coordinator/v1/datasources") public class DatasourcesResource { - private final InventoryView serverInventoryView; + private static final Logger log = new Logger(DatasourcesResource.class); + + private final CoordinatorServerView serverInventoryView; private final MetadataSegmentManager databaseSegmentManager; private final IndexingServiceClient indexingServiceClient; @Inject public DatasourcesResource( - InventoryView serverInventoryView, + CoordinatorServerView serverInventoryView, MetadataSegmentManager databaseSegmentManager, @Nullable IndexingServiceClient indexingServiceClient ) @@ -573,4 +583,55 @@ private Map> getSimpleDatasource(String dataSourceNa segments.put("maxTime", new DateTime(maxTime)); return retVal; } + + /** + * Provides serverView for a datasource and Interval which gives details about servers hosting segments for an interval + * Used by the realtime tasks to fetch a view of the interval they are interested in. + */ + @GET + @Path("/{dataSourceName}/intervals/{interval}/serverview") + @Produces(MediaType.APPLICATION_JSON) + public Response getSegmentDataSourceSpecificInterval( + @PathParam("dataSourceName") String dataSourceName, + @PathParam("interval") String interval, + @QueryParam("partial") final boolean partial + ) + { + TimelineLookup timeline = serverInventoryView.getTimeline( + new TableDataSource(dataSourceName) + ); + final Interval theInterval = new Interval(interval.replace("_", "/")); + if (timeline == null) { + log.debug("No timeline found for datasource[%s]", dataSourceName); + return Response.ok(Lists.newArrayList()).build(); + } + + Iterable> lookup = timeline.lookupWithIncompletePartitions(theInterval); + FunctionalIterable retval = FunctionalIterable + .create(lookup).transformCat( + new Function, Iterable>() + { + @Override + public Iterable apply( + TimelineObjectHolder input + ) + { + return Iterables.transform( + input.getObject(), + new Function, ImmutableSegmentLoadInfo>() + { + @Override + public ImmutableSegmentLoadInfo apply( + PartitionChunk chunk + ) + { + return chunk.getObject().toImmutableSegmentLoadInfo(); + } + } + ); + } + } + ); + return Response.ok(retval).build(); + } } diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java new file mode 100644 index 000000000000..13def515cf3a --- /dev/null +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import io.druid.curator.CuratorTestBase; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.TableDataSource; +import io.druid.segment.Segment; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.PartitionHolder; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +public class CoordinatorServerViewTest extends CuratorTestBase +{ + private final ObjectMapper jsonMapper; + private final ZkPathsConfig zkPathsConfig; + private final String announcementsPath; + private final String inventoryPath; + + private CountDownLatch segmentViewInitLatch; + private CountDownLatch segmentAddedLatch; + private CountDownLatch segmentRemovedLatch; + + private ServerInventoryView baseView; + private CoordinatorServerView overlordServerView; + + public CoordinatorServerViewTest() + { + jsonMapper = new DefaultObjectMapper(); + zkPathsConfig = new ZkPathsConfig(); + announcementsPath = zkPathsConfig.getAnnouncementsPath(); + inventoryPath = zkPathsConfig.getLiveSegmentsPath(); + } + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + curator.start(); + } + + @Test + public void testSingleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(1); + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final DruidServer druidServer = new DruidServer( + "localhost:1234", + "localhost:1234", + 10000000L, + "historical", + "default_tier", + 0 + ); + + setupZNodeForServer(druidServer); + + final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); + announceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + List serverLookupRes = (List) timeline.lookup( + new Interval( + "2014-10-20T00:00:00Z/P1D" + ) + ); + Assert.assertEquals(1, serverLookupRes.size()); + + TimelineObjectHolder actualTimelineObjectHolder = serverLookupRes.get(0); + Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); + Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); + + PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(segmentLoadInfo.isEmpty()); + Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + + unannounceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size() + ); + Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1")); + } + + @Test + public void testMultipleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(5); + + // temporarily set latch count to 1 + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final List druidServers = Lists.transform( + ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + new Function() + { + @Override + public DruidServer apply(String input) + { + return new DruidServer( + input, + input, + 10000000L, + "historical", + "default_tier", + 0 + ); + } + } + ); + + for (DruidServer druidServer : druidServers) { + setupZNodeForServer(druidServer); + } + + final List segments = Lists.transform( + ImmutableList.>of( + Pair.of("2011-04-01/2011-04-03", "v1"), + Pair.of("2011-04-03/2011-04-06", "v1"), + Pair.of("2011-04-01/2011-04-09", "v2"), + Pair.of("2011-04-06/2011-04-09", "v3"), + Pair.of("2011-04-01/2011-04-02", "v3") + ), new Function, DataSegment>() + { + @Override + public DataSegment apply(Pair input) + { + return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs); + } + } + ); + + for (int i = 0; i < 5; ++i) { + announceSegmentForServer(druidServers.get(i), segments.get(i)); + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") + unannounceSegmentForServer(druidServers.get(2), segments.get(2)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + // renew segmentRemovedLatch since we still have 4 segments to unannounce + segmentRemovedLatch = new CountDownLatch(4); + + timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)), + createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce all the segments + for (int i = 0; i < 5; ++i) { + // skip the one that was previously unannounced + if (i != 2) { + unannounceSegmentForServer(druidServers.get(i), segments.get(i)); + } + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size() + ); + } + + private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath( + ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), + jsonMapper.writeValueAsBytes( + ImmutableSet.of(segment) + ) + ); + } + + private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.delete().guaranteed().forPath( + ZKPaths.makePath( + ZKPaths.makePath(inventoryPath, druidServer.getHost()), + segment.getIdentifier() + ) + ); + } + + private Pair>> createExpected( + String intervalStr, + String version, + DruidServer druidServer, + DataSegment segment + ) + { + return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment))); + } + + private void assertValues( + List>>> expected, List actual + ) + { + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < expected.size(); ++i) { + Pair>> expectedPair = expected.get(i); + TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); + + Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); + Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); + + PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(segmentLoadInfo.isEmpty()); + Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + } + } + + private void setupViews() throws Exception + { + baseView = new BatchServerInventoryView( + zkPathsConfig, + curator, + jsonMapper, + Predicates.alwaysTrue() + ) + { + @Override + public void registerSegmentCallback(Executor exec, final SegmentCallback callback) + { + super.registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentAdded(server, segment); + segmentAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentRemoved(server, segment); + segmentRemovedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentViewInitialized() + { + CallbackAction res = callback.segmentViewInitialized(); + segmentViewInitLatch.countDown(); + return res; + } + } + ); + } + }; + + overlordServerView = new CoordinatorServerView( + baseView + ); + + baseView.start(); + } + + private void setupZNodeForServer(DruidServer server) throws Exception + { + curator.create() + .creatingParentsIfNeeded() + .forPath( + ZKPaths.makePath(announcementsPath, server.getHost()), + jsonMapper.writeValueAsBytes(server.getMetadata()) + ); + curator.create() + .creatingParentsIfNeeded() + .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); + } + + private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) + { + return DataSegment.builder() + .dataSource("test_overlord_server_view") + .interval(new Interval(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(version) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NoneShardSpec()) + .binaryVersion(9) + .size(0) + .build(); + } + + @After + public void tearDown() throws Exception + { + baseView.stop(); + tearDownServerAndCurator(); + } +} diff --git a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java new file mode 100644 index 000000000000..1fba30287f59 --- /dev/null +++ b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java @@ -0,0 +1,66 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.IOException; + +public class ImmutableSegmentLoadInfoTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void testSerde() throws IOException + { + ImmutableSegmentLoadInfo segmentLoadInfo = new ImmutableSegmentLoadInfo( + new DataSegment( + "test_ds", + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + null, + null, + null, + new NoneShardSpec(), + 0, 0 + ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1)) + ); + + ImmutableSegmentLoadInfo serde = mapper.readValue( + mapper.writeValueAsBytes(segmentLoadInfo), + ImmutableSegmentLoadInfo.class + ); + + Assert.assertEquals(segmentLoadInfo, serde); + } + +} diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java new file mode 100644 index 000000000000..c57665344644 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.client.coordinator.CoordinatorClient; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CoordinatorBasedSegmentHandoffNotifierTest +{ + + private final CoordinatorBasedSegmentHandoffNotifierConfig notifierConfig = new CoordinatorBasedSegmentHandoffNotifierConfig() + { + @Override + public Duration getPollDuration() + { + return Duration.millis(10); + } + }; + + @Test + public void testHandoffCallbackNotCalled() throws IOException, InterruptedException + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + SegmentDescriptor descriptor = new SegmentDescriptor( + interval, "v1", 2 + ); + DataSegment segment = new DataSegment( + "test_ds", + interval, + "v1", + null, + null, + null, + new NumberedShardSpec(2, 3), + 0, 0 + ); + + CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); + EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) + .andReturn( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + segment, + Sets.newHashSet( + createRealtimeServerMetadata("a1") + ) + ) + ) + ).anyTimes(); + EasyMock.replay(coordinatorClient); + CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( + "test_ds", + coordinatorClient, + notifierConfig + ); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + callbackCalled.set(true); + } + } + ); + notifier.checkForSegmentHandoffs(); + // callback should have registered + Assert.assertEquals(1, notifier.getHandOffCallbacks().size()); + Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor)); + Assert.assertFalse(callbackCalled.get()); + EasyMock.verify(coordinatorClient); + } + + @Test + public void testHandoffCallbackCalled() throws IOException, InterruptedException + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + SegmentDescriptor descriptor = new SegmentDescriptor( + interval, "v1", 2 + ); + DataSegment segment = new DataSegment( + "test_ds", + interval, + "v1", + null, + null, + null, + new NumberedShardSpec(2, 3), + 0, 0 + ); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); + EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) + .andReturn( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + segment, + Sets.newHashSet( + createHistoricalServerMetadata("a1") + ) + ) + ) + ).anyTimes(); + EasyMock.replay(coordinatorClient); + CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( + "test_ds", + coordinatorClient, + notifierConfig + ); + + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + callbackCalled.set(true); + } + } + ); + Assert.assertEquals(1, notifier.getHandOffCallbacks().size()); + Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor)); + notifier.checkForSegmentHandoffs(); + // callback should have been removed + Assert.assertTrue(notifier.getHandOffCallbacks().isEmpty()); + Assert.assertTrue(callbackCalled.get()); + EasyMock.verify(coordinatorClient); + } + + @Test + public void testHandoffChecksForVersion() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v2", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v2", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForAssignableServer() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createRealtimeServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + } + + @Test + public void testHandoffChecksForPartitionNumber() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 1) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForInterval() + { + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 1 + ), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-03" + ), "v1", 1 + ) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment( + new Interval( + "2011-04-01/2011-04-04" + ), "v1", 1 + ), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor( + new Interval( + "2011-04-02/2011-04-03" + ), "v1", 1 + ) + ) + ); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name) + { + return createServerMetadata(name, "realtime"); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name) + { + return createServerMetadata(name, "historical"); + } + + private DruidServerMetadata createServerMetadata(String name, String type) + { + return new DruidServerMetadata( + name, + name, + 10000, + type, + "tier", + 1 + ); + } + + private DataSegment createSegment(Interval interval, String version, int partitionNumber) + { + return new DataSegment( + "test_ds", + interval, + version, + null, + null, + null, + new NumberedShardSpec(partitionNumber, 100), + 0, 0 + ); + } +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index f478ed9ad0ca..3615ea33d377 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -43,6 +43,7 @@ import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -88,7 +89,8 @@ public class RealtimePlumberSchoolTest private DataSegmentAnnouncer announcer; private SegmentPublisher segmentPublisher; private DataSegmentPusher dataSegmentPusher; - private FilteredServerView serverView; + private SegmentHandoffNotifier handoffNotifier; + private SegmentHandoffNotifierFactory handoffNotifierFactory; private ServiceEmitter emitter; private RealtimeTuningConfig tuningConfig; private DataSchema schema; @@ -161,17 +163,20 @@ public void setUp() throws Exception segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class); dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class); - serverView = EasyMock.createMock(FilteredServerView.class); - serverView.registerSegmentCallback( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.>anyObject() - ); - EasyMock.expectLastCall().anyTimes(); + handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class); + EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes(); + EasyMock.expect( + handoffNotifier.registerSegmentHandoffCallback( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ) + ).andReturn(true).anyTimes(); emitter = EasyMock.createMock(ServiceEmitter.class); - EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); tuningConfig = new RealtimeTuningConfig( 1, @@ -195,7 +200,7 @@ public void setUp() throws Exception dataSegmentPusher, announcer, segmentPublisher, - serverView, + handoffNotifierFactory, MoreExecutors.sameThreadExecutor(), MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG, @@ -209,7 +214,7 @@ public void setUp() throws Exception @After public void tearDown() throws Exception { - EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher,handoffNotifierFactory, handoffNotifier, emitter); FileUtils.deleteDirectory( new File( tuningConfig.getBasePersistDirectory(), @@ -336,15 +341,15 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); plumber2.getSinks() - .put( - 0L, - new Sink( - testInterval, - schema2, - tuningConfig, - new DateTime("2014-12-01T12:34:56.789").toString() - ) - ); + .put( + 0L, + new Sink( + testInterval, + schema2, + tuningConfig, + new DateTime("2014-12-01T12:34:56.789").toString() + ) + ); Assert.assertNull(plumber2.startJob()); final Committer committer = new Committer() @@ -378,14 +383,18 @@ public void run() File persistDir = plumber2.computePersistDir(schema2, testInterval); /* Check that all hydrants were persisted */ - for (int i = 0; i < 5; i ++) { + for (int i = 0; i < 5; i++) { Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists()); } /* Create some gaps in the persisted hydrants and reload */ FileUtils.deleteDirectory(new File(persistDir, "1")); FileUtils.deleteDirectory(new File(persistDir, "3")); - RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + schema2, + tuningConfig, + metrics + ); restoredPlumber.bootstrapSinksFromDisk(); Map sinks = restoredPlumber.getSinks(); @@ -394,26 +403,37 @@ public void run() List hydrants = Lists.newArrayList(sinks.get(new Long(0))); DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z"); Assert.assertEquals(0, hydrants.get(0).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), - hydrants.get(0).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), + hydrants.get(0).getSegment().getDataInterval() + ); Assert.assertEquals(2, hydrants.get(1).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), - hydrants.get(1).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), + hydrants.get(1).getSegment().getDataInterval() + ); Assert.assertEquals(4, hydrants.get(2).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), - hydrants.get(2).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), + hydrants.get(2).getSegment().getDataInterval() + ); /* Delete all the hydrants and reload, no sink should be created */ FileUtils.deleteDirectory(new File(persistDir, "0")); FileUtils.deleteDirectory(new File(persistDir, "2")); FileUtils.deleteDirectory(new File(persistDir, "4")); - RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber( + schema2, + tuningConfig, + metrics + ); restoredPlumber2.bootstrapSinksFromDisk(); Assert.assertEquals(0, restoredPlumber2.getSinks().size()); } - private InputRow getTestInputRow(final String timeStr) { + private InputRow getTestInputRow(final String timeStr) + { return new InputRow() { @Override diff --git a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java index f24ed60f5fd9..7d20f62227eb 100644 --- a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java +++ b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.druid.client.CoordinatorServerView; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.InventoryView; @@ -42,7 +43,7 @@ public class DatasourcesResourceTest { - private InventoryView inventoryView; + private CoordinatorServerView inventoryView; private DruidServer server; private List listDataSources; private List dataSegmentList; @@ -50,7 +51,7 @@ public class DatasourcesResourceTest @Before public void setUp() { - inventoryView = EasyMock.createStrictMock(InventoryView.class); + inventoryView = EasyMock.createStrictMock(CoordinatorServerView.class); server = EasyMock.createStrictMock(DruidServer.class); dataSegmentList = new ArrayList<>(); dataSegmentList.add( diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 3911e0f96964..71fbba6a61b1 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -26,6 +26,8 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.airline.Command; +import io.druid.audit.AuditManager; +import io.druid.client.CoordinatorServerView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.guice.ConfigProvider; import io.druid.guice.Jerseys; @@ -116,6 +118,7 @@ public void configure(Binder binder) .in(ManageLifecycle.class); binder.bind(IndexingServiceClient.class).in(LazySingleton.class); + binder.bind(CoordinatorServerView.class).in(LazySingleton.class); binder.bind(DruidCoordinator.class); @@ -134,6 +137,8 @@ public void configure(Binder binder) Jerseys.addResource(binder, MetadataResource.class); LifecycleModule.register(binder, Server.class); + LifecycleModule.register(binder, DatasourcesResource.class); + } @Provides diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 5a2b3bbf9ff9..048db35c1aae 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -31,6 +31,7 @@ import io.airlift.airline.Command; import io.airlift.airline.Option; import io.druid.client.cache.CacheConfig; +import io.druid.client.coordinator.CoordinatorClient; import io.druid.guice.Binders; import io.druid.guice.CacheModule; import io.druid.guice.IndexingServiceFirehoseModule; @@ -71,6 +72,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -128,7 +132,7 @@ public void configure(Binder binder) handlerProviderBinder.addBinding("noop") .to(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class); - + binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); @@ -162,12 +166,22 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule()); + JsonConfigProvider.bind( + binder, + "druid.segment.handoff", + CoordinatorBasedSegmentHandoffNotifierConfig.class + ); + binder.bind(SegmentHandoffNotifierFactory.class) + .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) + .in(LazySingleton.class); + // Override the default SegmentLoaderConfig because we don't actually care about the // configuration based locations. This will override them anyway. This is also stopping // configuration of other parameters, but I don't think that's actually a problem. // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. binder.bind(SegmentLoaderConfig.class) .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + binder.bind(CoordinatorClient.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); Jerseys.addResource(binder, QueryResource.class); diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index c0753fe01fa2..391a9ebc8505 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -24,6 +24,7 @@ import com.google.inject.multibindings.MapBinder; import io.druid.cli.QueryJettyServerInitializer; import io.druid.client.cache.CacheConfig; +import io.druid.client.coordinator.CoordinatorClient; import io.druid.metadata.MetadataSegmentPublisher; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.FireDepartment; @@ -34,6 +35,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -77,10 +81,20 @@ public void configure(Binder binder) .to(NoopChatHandlerProvider.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider(FireDepartmentsProvider.class) .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class); + binder.bind(SegmentHandoffNotifierFactory.class) + .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) + .in(LazySingleton.class); + binder.bind(CoordinatorClient.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule()); From 6c5a781153f17176fadcfe6d0a82956295e62f0e Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 5 Dec 2015 11:32:30 +0530 Subject: [PATCH 2/2] Remove FilteredServerView --- .../io/druid/indexing/common/TaskToolbox.java | 2 - .../druid/indexing/test/TestServerView.java | 15 +- .../client/BatchServerInventoryView.java | 90 ++--------- .../BatchServerInventoryViewProvider.java | 2 +- .../FilteredBatchServerViewProvider.java | 48 ------ .../io/druid/client/FilteredServerView.java | 30 ---- .../client/FilteredServerViewProvider.java | 32 ---- .../FilteredSingleServerViewProvider.java | 48 ------ .../client/SingleServerInventoryProvider.java | 2 +- .../client/SingleServerInventoryView.java | 82 +--------- .../java/io/druid/guice/ServerViewModule.java | 4 - .../io/druid/client/BrokerServerViewTest.java | 3 +- .../client/CoordinatorServerViewTest.java | 3 +- .../client/BatchServerInventoryViewTest.java | 148 ++---------------- .../plumber/RealtimePlumberSchoolTest.java | 3 - 15 files changed, 31 insertions(+), 481 deletions(-) delete mode 100644 server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java delete mode 100644 server/src/main/java/io/druid/client/FilteredServerView.java delete mode 100644 server/src/main/java/io/druid/client/FilteredServerViewProvider.java delete mode 100644 server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 7a619a9decf1..0bd54916e6ca 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -25,12 +25,10 @@ import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index b9185547e096..d281c8db4928 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -19,34 +19,21 @@ package io.druid.indexing.test; -import com.google.api.client.util.Lists; import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.metamx.common.Pair; -import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -public class TestServerView implements FilteredServerView, ServerView.SegmentCallback +public class TestServerView implements ServerView.SegmentCallback { final ConcurrentMap, Executor>> callbacks = Maps.newConcurrentMap(); - @Override - public void registerSegmentCallback( - final Executor exec, - final ServerView.SegmentCallback callback, - final Predicate filter - ) - { - callbacks.put(callback, Pair.of(filter, exec)); - } - @Override public ServerView.CallbackAction segmentAdded( final DruidServerMetadata server, diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 4bea30d0c8f5..28cdfcdfb5fc 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -19,42 +19,33 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; /** */ @ManageLifecycle -public class BatchServerInventoryView extends ServerInventoryView> implements FilteredServerView +public class BatchServerInventoryView extends ServerInventoryView> { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); final private ConcurrentMap> zNodes = new MapMaker().makeMap(); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - final private Predicate defaultFilter; @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -63,11 +54,10 @@ public BatchServerInventoryView( zkPaths.getLiveSegmentsPath(), curator, jsonMapper, - new TypeReference>(){} + new TypeReference>() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -77,12 +67,8 @@ protected DruidServer addInnerInventory( final Set inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - - zNodes.put(inventoryKey, filteredInventory); - for (DataSegment segment : filteredInventory) { + zNodes.put(inventoryKey, inventory); + for (DataSegment segment : inventory) { addSingleInventory(container, segment); } return container; @@ -93,22 +79,18 @@ protected DruidServer updateInnerInventory( DruidServer container, String inventoryKey, Set inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); } - for (DataSegment segment : Sets.difference(filteredInventory, existing)) { + for (DataSegment segment : Sets.difference(inventory, existing)) { addSingleInventory(container, segment); } - for (DataSegment segment : Sets.difference(existing, filteredInventory)) { + for (DataSegment segment : Sets.difference(existing, inventory)) { removeSingleInventory(container, segment.getIdentifier()); } - zNodes.put(inventoryKey, filteredInventory); + zNodes.put(inventoryKey, inventory); return container; } @@ -129,56 +111,4 @@ protected DruidServer removeInnerInventory(final DruidServer container, String i } return container; } - - @Override - public void registerSegmentCallback( - final Executor exec, final SegmentCallback callback, final Predicate filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java index 664952e4a024..24313b52c847 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java @@ -45,6 +45,6 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv @Override public BatchServerInventoryView get() { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new BatchServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java deleted file mode 100644 index cc15323448ce..000000000000 --- a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredBatchServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public BatchServerInventoryView get() - { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/FilteredServerView.java b/server/src/main/java/io/druid/client/FilteredServerView.java deleted file mode 100644 index 67c17b782101..000000000000 --- a/server/src/main/java/io/druid/client/FilteredServerView.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.client; - -import com.google.common.base.Predicate; -import io.druid.timeline.DataSegment; - -import java.util.concurrent.Executor; - -public interface FilteredServerView -{ - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ); -} diff --git a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java deleted file mode 100644 index 2a6e4a470d18..000000000000 --- a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.client; - - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class), - @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class) -}) -public interface FilteredServerViewProvider extends Provider -{ -} diff --git a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java deleted file mode 100644 index 3acbb06c9d6b..000000000000 --- a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredSingleServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public SingleServerInventoryView get() - { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java index 30b06a1c5d7b..3d4e04800cf9 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java @@ -45,6 +45,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide @Override public ServerInventoryView get() { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new SingleServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 277b54320b50..a330a5c53d0e 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -19,37 +19,25 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.MapMaker; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; - /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerView +public class SingleServerInventoryView extends ServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - private final Predicate defaultFilter; - @Inject public SingleServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -58,11 +46,10 @@ public SingleServerInventoryView( zkPaths.getServedSegmentsPath(), curator, jsonMapper, - new TypeReference(){} + new TypeReference() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -70,10 +57,7 @@ protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, DataSegment inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - if(predicate.apply(inventory)) { - addSingleInventory(container, inventory); - } + addSingleInventory(container, inventory); return container; } @@ -91,58 +75,4 @@ protected DruidServer removeInnerInventory(DruidServer container, String invento removeSingleInventory(container, inventoryKey); return container; } - - @Override - public void registerSegmentCallback( - final Executor exec, final SegmentCallback callback, final Predicate filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index 8f2159e260ba..8bf09fd3d049 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -19,8 +19,6 @@ import com.google.inject.Binder; import com.google.inject.Module; -import io.druid.client.FilteredServerView; -import io.druid.client.FilteredServerViewProvider; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -34,10 +32,8 @@ public class ServerViewModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); - JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); - binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class); } } diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index c178648f4531..7b3550203a8a 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -320,8 +320,7 @@ private void setupViews() throws Exception baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ) { @Override diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index 13def515cf3a..5b31d569fee5 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -306,8 +306,7 @@ private void setupViews() throws Exception baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ) { @Override diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index 346ef2595514..f02bdd21554e 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -18,8 +18,6 @@ package io.druid.client.client; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -31,7 +29,6 @@ import com.metamx.common.ISE; import io.druid.client.BatchServerInventoryView; import io.druid.client.DruidServer; -import io.druid.client.ServerView; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; @@ -45,9 +42,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.LogicalOperator; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -57,9 +51,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -157,8 +149,7 @@ public String getBase() } }, cf, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ); batchServerInventoryView.start(); @@ -173,16 +164,9 @@ public String getBase() } }, cf, - jsonMapper, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS)); - } - } - ){ + jsonMapper + ) + { @Override protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, Set inventory @@ -241,122 +225,6 @@ public void testRun() throws Exception Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); } - @Test - public void testRunWithFilter() throws Exception - { - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - int prevUpdateCount = inventoryUpdateCounter.get(); - // segment outside the range of default filter - DataSegment segment1 = makeSegment(101); - segmentAnnouncer.announceSegment(segment1); - testSegments.add(segment1); - - waitForUpdateEvents(prevUpdateCount + 1); - Assert.assertNull( - Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory()) - .getSegment(segment1.getIdentifier()) - ); - } - - @Test - public void testRunWithFilterCallback() throws Exception - { - final CountDownLatch removeCallbackLatch = new CountDownLatch(1); - - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - - ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class); - Comparator dataSegmentComparator = new Comparator() - { - @Override - public int compare(DataSegment o1, DataSegment o2) - { - return o1.getInterval().equals(o2.getInterval()) ? 0 : -1; - } - }; - - EasyMock - .expect( - callback.segmentAdded( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andReturn(ServerView.CallbackAction.CONTINUE) - .times(1); - - EasyMock - .expect( - callback.segmentRemoved( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andAnswer( - new IAnswer() - { - @Override - public ServerView.CallbackAction answer() throws Throwable - { - removeCallbackLatch.countDown(); - return ServerView.CallbackAction.CONTINUE; - } - } - ) - .times(1); - - - EasyMock.replay(callback); - - filteredBatchServerInventoryView.registerSegmentCallback( - MoreExecutors.sameThreadExecutor(), - callback, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2)); - } - } - ); - - DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2); - segmentAnnouncer.announceSegment(segment2); - testSegments.add(segment2); - - DataSegment oldSegment = makeSegment(-1); - segmentAnnouncer.announceSegment(oldSegment); - testSegments.add(oldSegment); - - segmentAnnouncer.unannounceSegment(oldSegment); - testSegments.remove(oldSegment); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - segmentAnnouncer.unannounceSegment(segment2); - testSegments.remove(segment2); - - waitForSync(filteredBatchServerInventoryView, testSegments); - timing.forWaiting().awaitLatch(removeCallbackLatch); - - EasyMock.verify(callback); - } - private DataSegment makeSegment(int offset) { return DataSegment.builder() @@ -393,7 +261,11 @@ private void waitForUpdateEvents(int count) while (inventoryUpdateCounter.get() != count) { Thread.sleep(100); if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) { - throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get()); + throw new ISE( + "BatchServerInventoryView is not updating counter expected[%d] value[%d]", + count, + inventoryUpdateCounter.get() + ); } } } @@ -457,7 +329,7 @@ public String getBase() List segments = new ArrayList(); try { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { - segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); + segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); } latch.countDown(); latch.await(); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 3615ea33d377..d55269fedaea 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -20,7 +20,6 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,8 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow;