diff --git a/common/src/main/java/io/druid/timeline/TimelineLookup.java b/common/src/main/java/io/druid/timeline/TimelineLookup.java index 9eca6534a315..25ce7c87f03d 100644 --- a/common/src/main/java/io/druid/timeline/TimelineLookup.java +++ b/common/src/main/java/io/druid/timeline/TimelineLookup.java @@ -37,6 +37,17 @@ public interface TimelineLookup */ public Iterable> lookup(Interval interval); + /** + * Does a lookup for the objects representing the given time interval. Will also return + * incomplete PartitionHolders. + * + * @param interval interval to find objects for + * + * @return Holders representing the interval that the objects exist for, PartitionHolders + * can be incomplete. Holders returned sorted by the interval. + */ + public Iterable> lookupWithIncompletePartitions(Interval interval); + public PartitionHolder findEntry(Interval interval, VersionType version); } diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 57c9005b6295..19219f59d160 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -192,6 +192,18 @@ public List> lookup(Interval inter } } + @Override + public Iterable> lookupWithIncompletePartitions(Interval interval) + { + try { + lock.readLock().lock(); + return lookup(interval, true); + } + finally { + lock.readLock().unlock(); + } + } + public Set> findOvershadowed() { try { diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 0ea9c8b80fdf..e4bae5ee1c9b 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -257,6 +257,16 @@ This config is used to find the [Indexing Service](../design/indexing-service.ht |--------|-----------|-------| |`druid.selectors.indexing.serviceName`|The druid.service name of the indexing service Overlord node. To start the Overlord with a different name, set it with this property. |druid/overlord| + +### Coordinator Discovery + +This config is used to find the [Coordinator](../design/coordinator.html) using Curator service discovery. This config is used by the realtime indexing nodes to get information about the segments loaded in the cluster. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.selectors.coordinator.serviceName`|The druid.service name of the coordinator node. To start the Coordinator with a different name, set it with this property. |druid/coordinator| + + ### Announcing Segments You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs. diff --git a/docs/content/configuration/production-cluster.md b/docs/content/configuration/production-cluster.md index 79c690f5fdf0..ab2a5c489c58 100644 --- a/docs/content/configuration/production-cluster.md +++ b/docs/content/configuration/production-cluster.md @@ -63,6 +63,8 @@ druid.cache.readBufferSize=10485760 # Indexing Service Service Discovery druid.selectors.indexing.serviceName=druid:prod:overlord +# Coordinator Service Discovery +druid.selectors.coordinator.serviceName=druid:prod:coordinator ``` ### Overlord Node diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 12a5d450f195..3dfc545dde6d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -28,12 +28,10 @@ import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -45,6 +43,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -63,14 +62,14 @@ public class TaskToolbox { private final TaskConfig config; private final Task task; - private final TaskActionClientFactory taskActionClientFactory; + private final TaskActionClient taskActionClient; private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; private final ExecutorService queryExecutorService; @@ -86,14 +85,14 @@ public class TaskToolbox public TaskToolbox( TaskConfig config, Task task, - TaskActionClientFactory taskActionClientFactory, + TaskActionClient taskActionClient, ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, + SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -108,14 +107,14 @@ public TaskToolbox( { this.config = config; this.task = task; - this.taskActionClientFactory = taskActionClientFactory; + this.taskActionClient = taskActionClient; this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -135,7 +134,7 @@ public TaskConfig getConfig() public TaskActionClient getTaskActionClient() { - return taskActionClientFactory.create(task); + return taskActionClient; } public ServiceEmitter getEmitter() @@ -168,9 +167,9 @@ public DataSegmentAnnouncer getSegmentAnnouncer() return segmentAnnouncer; } - public FilteredServerView getNewSegmentServerView() + public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory() { - return newSegmentServerView; + return handoffNotifierFactory; } public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 0349c2a995af..055da6a3a48e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,10 +24,10 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -38,6 +38,7 @@ import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import java.io.File; @@ -56,7 +57,7 @@ public class TaskToolboxFactory private final DataSegmentMover dataSegmentMover; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; @@ -77,7 +78,7 @@ public TaskToolboxFactory( DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, + SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -97,7 +98,7 @@ public TaskToolboxFactory( this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -112,18 +113,17 @@ public TaskToolboxFactory( public TaskToolbox build(Task task) { final File taskWorkDir = config.getTaskWorkDir(task.getId()); - return new TaskToolbox( config, task, - taskActionClientFactory, + taskActionClientFactory.create(task), emitter, segmentPusher, dataSegmentKiller, dataSegmentMover, dataSegmentArchiver, segmentAnnouncer, - newSegmentServerView, + handoffNotifierFactory, queryRunnerFactoryConglomerate, queryExecutorService, monitorScheduler, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index b2cb6717074d..87f07a42135b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -287,7 +287,7 @@ public String getVersion(final Interval interval) toolbox.getSegmentPusher(), lockingSegmentAnnouncer, segmentPublisher, - toolbox.getNewSegmentServerView(), + toolbox.getSegmentHandoffNotifierFactory(), toolbox.getQueryExecutorService(), toolbox.getIndexMerger(), toolbox.getIndexIO(), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index b3c4512bc5de..75e163713785 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -39,6 +38,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; @@ -66,7 +66,9 @@ public class TaskToolboxTest private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class); private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class); private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class); - private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class); + private SegmentHandoffNotifierFactory mockHandoffNotifierFactory = EasyMock.createNiceMock( + SegmentHandoffNotifierFactory.class + ); private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class); private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); @@ -86,7 +88,8 @@ public class TaskToolboxTest public void setUp() throws IOException { EasyMock.expect(task.getId()).andReturn("task_id").anyTimes(); - EasyMock.replay(task); + EasyMock.expect(task.getDataSource()).andReturn("task_ds").anyTimes(); + EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), @@ -97,7 +100,7 @@ public void setUp() throws IOException mockDataSegmentMover, mockDataSegmentArchiver, mockSegmentAnnouncer, - mockNewSegmentServerView, + mockHandoffNotifierFactory, mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, @@ -122,12 +125,6 @@ public void testGetSegmentAnnouncer() Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer()); } - @Test - public void testGetNewSegmentServerView() - { - Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView()); - } - @Test public void testGetQueryRunnerFactoryConglomerate() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 095c4aaf710b..156ba36dff17 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -223,26 +223,19 @@ private final List runTask(final IndexTask indexTask) throws Except indexTask.run( new TaskToolbox( - null, null, new TaskActionClientFactory() + null, null, new TaskActionClient() { @Override - public TaskActionClient create(Task task) + public RetType submit(TaskAction taskAction) throws IOException { - return new TaskActionClient() - { - @Override - public RetType submit(TaskAction taskAction) throws IOException - { - if (taskAction instanceof LockListAction) { - return (RetType) Arrays.asList( - new TaskLock( - "", "", null, new DateTime().toString() - ) - ); - } - return null; - } - }; + if (taskAction instanceof LockListAction) { + return (RetType) Arrays.asList( + new TaskLock( + "", "", null, new DateTime().toString() + ) + ); + } + return null; } }, null, new DataSegmentPusher() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 086ae3715a2e..71ab5107d46f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; @@ -60,7 +62,6 @@ import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; -import io.druid.indexing.test.TestServerView; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; @@ -72,6 +73,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -90,9 +92,10 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.metrics.EventReceiverFirehoseRegister; -import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -107,7 +110,9 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; public class RealtimeIndexTaskTest { @@ -136,6 +141,8 @@ public class RealtimeIndexTaskTest private DateTime now; private ListeningExecutorService taskExec; + private Map> handOffCallbacks; + private SegmentHandoffNotifierFactory handoffNotifierFactory; @Before public void setUp() @@ -204,9 +211,10 @@ public void testBasics() throws Exception Assert.assertEquals(2, countEvents(task)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -294,9 +302,10 @@ public void testRestore() throws Exception Assert.assertEquals(2, countEvents(task2)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -485,6 +494,42 @@ public void registerQuery(Query query, ListenableFuture future) ) ) ); + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() + { + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new SegmentHandoffNotifier() + { + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; + } + }; final TestUtils testUtils = new TestUtils(); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( taskConfig, @@ -495,7 +540,7 @@ public void registerQuery(Query query, ListenableFuture future) null, // DataSegmentMover null, // DataSegmentArchiver new TestDataSegmentAnnouncer(), - new TestServerView(), + handoffNotifierFactory, conglomerate, MoreExecutors.sameThreadExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 924dbd82c5b3..53f409af36a9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -78,8 +78,10 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.AfterClass; import org.junit.Assert; @@ -198,6 +200,8 @@ public void deleteSegments(Set segments) ts, new TaskActionToolbox(tl, mdc, newMockEmitter()) ); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), @@ -249,7 +253,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException } }, null, // segment announcer - null, // new segment server view + notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler @@ -521,5 +525,7 @@ public void emit(ServiceEventBuilder builder) } }; + + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 1af3c34429f7..23c391719cfe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -62,10 +62,12 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import org.apache.commons.io.FileUtils; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -293,6 +295,8 @@ public RetType submit(TaskAction taskAction) throws IOExcepti } } }; + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), new TaskActionClientFactory() @@ -308,8 +312,8 @@ public TaskActionClient create(Task task) null, // segment killer null, // segment mover null, // segment archiver - null, // segment announcer - null, // new segment server view + null, // segment announcer, + notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 45cf6ae6b38a..171cdbfeb4f7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -26,16 +26,17 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Event; @@ -43,8 +44,6 @@ import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -61,6 +60,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; @@ -69,7 +69,6 @@ import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; -import io.druid.indexing.common.task.RealtimeIndexTaskTest; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; @@ -78,6 +77,7 @@ import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -98,8 +98,9 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentTest; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -118,7 +119,6 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -197,7 +197,6 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private TaskToolboxFactory tb = null; private IndexSpec indexSpec; private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; - private FilteredServerView serverView; private MonitorScheduler monitorScheduler; private ServiceEmitter emitter; private TaskQueueConfig tqc; @@ -205,7 +204,9 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private int announcedSinks; private static CountDownLatch publishCountDown; private TestDerbyConnector testDerbyConnector; - private List segmentCallbacks = new ArrayList<>(); + private SegmentHandoffNotifierFactory handoffNotifierFactory; + private Map> handOffCallbacks; + private static TestIndexerMetadataStorageCoordinator newMockMDC() { @@ -393,15 +394,42 @@ public void setUp() throws Exception } else { throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); } - - serverView = new FilteredServerView() + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() { @Override - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ) + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { - segmentCallbacks.add(callback); + return new SegmentHandoffNotifier() + { + + + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; } }; setUpAndStartTaskQueue( @@ -485,7 +513,7 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } }, // segment announcer - serverView, // new segment server view + handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective null, // query executor service monitorScheduler, // monitor scheduler @@ -855,16 +883,10 @@ public void testRealtimeIndexTask() throws Exception Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS)); // Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status - segmentCallbacks.get(0).segmentAdded( - new DruidServerMetadata( - "dummy", - "dummy_host", - 0, - "historical", - "dummy_tier", - 0 - ), mdc.getPublished().iterator().next() - ); + Assert.assertEquals(1, handOffCallbacks.size()); + Pair executorRunnablePair = Iterables.getOnlyElement(handOffCallbacks.values()); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + handOffCallbacks.clear(); // Wait for realtime index task to handle callback in plumber and succeed while (tsqa.getStatus(taskId).get().isRunnable()) { diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index b9185547e096..d281c8db4928 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -19,34 +19,21 @@ package io.druid.indexing.test; -import com.google.api.client.util.Lists; import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.metamx.common.Pair; -import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -public class TestServerView implements FilteredServerView, ServerView.SegmentCallback +public class TestServerView implements ServerView.SegmentCallback { final ConcurrentMap, Executor>> callbacks = Maps.newConcurrentMap(); - @Override - public void registerSegmentCallback( - final Executor exec, - final ServerView.SegmentCallback callback, - final Predicate filter - ) - { - callbacks.put(callback, Pair.of(filter, exec)); - } - @Override public ServerView.CallbackAction segmentAdded( final DruidServerMetadata server, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 1a9a6377def2..aa29337f4e45 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -32,7 +32,10 @@ import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; @@ -41,6 +44,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -48,6 +52,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; +import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -137,6 +142,11 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); + TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); + EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes(); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory); return new WorkerTaskMonitor( jsonMapper, cf, @@ -144,7 +154,8 @@ private WorkerTaskMonitor createTaskMonitor() new ThreadPoolTaskRunner( new TaskToolboxFactory( taskConfig, - null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + taskActionClientFactory, + null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 44c8f7a9fe0e..35fc92325dc2 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -21,42 +21,33 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; /** */ @ManageLifecycle -public class BatchServerInventoryView extends ServerInventoryView> implements FilteredServerView +public class BatchServerInventoryView extends ServerInventoryView> { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); final private ConcurrentMap> zNodes = new MapMaker().makeMap(); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - final private Predicate defaultFilter; @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -65,11 +56,10 @@ public BatchServerInventoryView( zkPaths.getLiveSegmentsPath(), curator, jsonMapper, - new TypeReference>(){} + new TypeReference>() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -79,12 +69,8 @@ protected DruidServer addInnerInventory( final Set inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - - zNodes.put(inventoryKey, filteredInventory); - for (DataSegment segment : filteredInventory) { + zNodes.put(inventoryKey, inventory); + for (DataSegment segment : inventory) { addSingleInventory(container, segment); } return container; @@ -95,22 +81,18 @@ protected DruidServer updateInnerInventory( DruidServer container, String inventoryKey, Set inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); } - for (DataSegment segment : Sets.difference(filteredInventory, existing)) { + for (DataSegment segment : Sets.difference(inventory, existing)) { addSingleInventory(container, segment); } - for (DataSegment segment : Sets.difference(existing, filteredInventory)) { + for (DataSegment segment : Sets.difference(existing, inventory)) { removeSingleInventory(container, segment.getIdentifier()); } - zNodes.put(inventoryKey, filteredInventory); + zNodes.put(inventoryKey, inventory); return container; } @@ -131,56 +113,4 @@ protected DruidServer removeInnerInventory(final DruidServer container, String i } return container; } - - @Override - public void registerSegmentCallback( - final Executor exec, final SegmentCallback callback, final Predicate filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java index 3d08c753f216..9a1ef188f00e 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java @@ -47,6 +47,6 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv @Override public BatchServerInventoryView get() { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new BatchServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java new file mode 100644 index 000000000000..e61d7a309dc3 --- /dev/null +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -0,0 +1,209 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; +import io.druid.query.DataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * ServerView of coordinator for the state of segments being loaded in the cluster. + */ +public class CoordinatorServerView implements InventoryView +{ + private static final Logger log = new Logger(CoordinatorServerView.class); + + private final Object lock = new Object(); + + private final Map segmentLoadInfos; + private final Map> timelines; + + private final ServerInventoryView baseView; + + private volatile boolean initialized = false; + + @Inject + public CoordinatorServerView( + ServerInventoryView baseView + ) + { + this.baseView = baseView; + this.segmentLoadInfos = Maps.newHashMap(); + this.timelines = Maps.newHashMap(); + + ExecutorService exec = Execs.singleThreaded("CoordinatorServerView-%s"); + baseView.registerSegmentCallback( + exec, + new ServerView.SegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + serverAddedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment) + { + serverRemovedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + initialized = true; + return ServerView.CallbackAction.CONTINUE; + } + } + ); + + baseView.registerServerCallback( + exec, + new ServerView.ServerCallback() + { + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + removeServer(server); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } + + public boolean isInitialized() + { + return initialized; + } + + public void clear() + { + synchronized (lock) { + timelines.clear(); + segmentLoadInfos.clear(); + } + } + + private void removeServer(DruidServer server) + { + for (DataSegment segment : server.getSegments().values()) { + serverRemovedSegment(server.getMetadata(), segment); + } + } + + private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) + { + String segmentId = segment.getIdentifier(); + synchronized (lock) { + log.debug("Adding segment[%s] for server[%s]", segment, server); + + SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId); + if (segmentLoadInfo == null) { + // servers escape the scope of this object so use ConcurrentSet + segmentLoadInfo = new SegmentLoadInfo(segment); + + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + timelines.put(segment.getDataSource(), timeline); + } + + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segmentLoadInfo) + ); + segmentLoadInfos.put(segmentId, segmentLoadInfo); + } + segmentLoadInfo.addServer(server); + } + } + + private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) + { + String segmentId = segment.getIdentifier(); + + + synchronized (lock) { + log.debug("Removing segment[%s] from server[%s].", segmentId, server); + + final SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId); + if (segmentLoadInfo == null) { + log.warn("Told to remove non-existant segment[%s]", segmentId); + return; + } + segmentLoadInfo.removeServer(server); + if (segmentLoadInfo.isEmpty()) { + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + segmentLoadInfos.remove(segmentId); + + final PartitionChunk removedPartition = timeline.remove( + segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk( + new SegmentLoadInfo( + segment + ) + ) + ); + + if (removedPartition == null) { + log.warn( + "Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", + segment.getInterval(), + segment.getVersion() + ); + } + } + } + } + + public VersionedIntervalTimeline getTimeline(DataSource dataSource) + { + String table = Iterables.getOnlyElement(dataSource.getNames()); + synchronized (lock) { + return timelines.get(table); + } + } + + + @Override + public DruidServer getInventoryValue(String string) + { + return baseView.getInventoryValue(string); + } + + @Override + public Iterable getInventory() + { + return baseView.getInventory(); + } +} diff --git a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java deleted file mode 100644 index 190a031667af..000000000000 --- a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredBatchServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public BatchServerInventoryView get() - { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/FilteredServerView.java b/server/src/main/java/io/druid/client/FilteredServerView.java deleted file mode 100644 index 38496db89a7f..000000000000 --- a/server/src/main/java/io/druid/client/FilteredServerView.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.google.common.base.Predicate; -import io.druid.timeline.DataSegment; - -import java.util.concurrent.Executor; - -public interface FilteredServerView -{ - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ); -} diff --git a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java deleted file mode 100644 index 06e4a0aeb775..000000000000 --- a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class), - @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class) -}) -public interface FilteredServerViewProvider extends Provider -{ -} diff --git a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java deleted file mode 100644 index c7f2902665c5..000000000000 --- a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredSingleServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public SingleServerInventoryView get() - { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java b/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java new file mode 100644 index 000000000000..0eebd6766c7e --- /dev/null +++ b/server/src/main/java/io/druid/client/ImmutableSegmentLoadInfo.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.Set; + +public class ImmutableSegmentLoadInfo +{ + private final DataSegment segment; + private final ImmutableSet servers; + + @JsonCreator + public ImmutableSegmentLoadInfo( + @JsonProperty("segment") DataSegment segment, + @JsonProperty("servers") Set servers + ) + { + Preconditions.checkNotNull(segment, "segment"); + Preconditions.checkNotNull(servers, "servers"); + this.segment = segment; + this.servers = ImmutableSet.copyOf(servers); + } + + @JsonProperty("segment") + public DataSegment getSegment() + { + return segment; + } + + @JsonProperty("servers") + public ImmutableSet getServers() + { + return servers; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ImmutableSegmentLoadInfo that = (ImmutableSegmentLoadInfo) o; + + if (!segment.equals(that.segment)) { + return false; + } + return servers.equals(that.servers); + + } + + @Override + public int hashCode() + { + int result = segment.hashCode(); + result = 31 * result + servers.hashCode(); + return result; + } + + @Override + public String toString() + { + return "SegmentLoadInfo{" + + "segment=" + segment + + ", servers=" + servers + + '}'; + } +} diff --git a/server/src/main/java/io/druid/client/SegmentLoadInfo.java b/server/src/main/java/io/druid/client/SegmentLoadInfo.java new file mode 100644 index 000000000000..6dac43733d9c --- /dev/null +++ b/server/src/main/java/io/druid/client/SegmentLoadInfo.java @@ -0,0 +1,105 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Set; + +public class SegmentLoadInfo +{ + private final DataSegment segment; + private final Set servers; + + public SegmentLoadInfo(DataSegment segment) + { + Preconditions.checkNotNull(segment, "segment"); + this.segment = segment; + this.servers = Sets.newConcurrentHashSet(); + } + + public DataSegment getSegment() + { + return segment; + } + + public boolean addServer(DruidServerMetadata server) + { + return servers.add(server); + } + + public boolean removeServer(DruidServerMetadata server) + { + return servers.remove(server); + } + + public boolean isEmpty() + { + return servers.isEmpty(); + } + + public ImmutableSegmentLoadInfo toImmutableSegmentLoadInfo() + { + return new ImmutableSegmentLoadInfo(segment, servers); + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SegmentLoadInfo that = (SegmentLoadInfo) o; + + if (!segment.equals(that.segment)) { + return false; + } + return servers.equals(that.servers); + + } + + @Override + public int hashCode() + { + int result = segment.hashCode(); + result = 31 * result + servers.hashCode(); + return result; + } + + @Override + public String toString() + { + return "SegmentLoadInfo{" + + "segment=" + segment + + ", servers=" + servers + + '}'; + } +} diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java index 1a63607f10fd..76d170a34aff 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java @@ -47,6 +47,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide @Override public ServerInventoryView get() { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new SingleServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 336a3087b049..3a51cf0dd968 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -21,37 +21,25 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.MapMaker; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; - /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerView +public class SingleServerInventoryView extends ServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - private final Predicate defaultFilter; - @Inject public SingleServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -60,11 +48,10 @@ public SingleServerInventoryView( zkPaths.getServedSegmentsPath(), curator, jsonMapper, - new TypeReference(){} + new TypeReference() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -72,10 +59,7 @@ protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, DataSegment inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - if(predicate.apply(inventory)) { - addSingleInventory(container, inventory); - } + addSingleInventory(container, inventory); return container; } @@ -93,58 +77,4 @@ protected DruidServer removeInnerInventory(DruidServer container, String invento removeSingleInventory(container, inventoryKey); return container; } - - @Override - public void registerSegmentCallback( - final Executor exec, final SegmentCallback callback, final Predicate filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/client/coordinator/Coordinator.java b/server/src/main/java/io/druid/client/coordinator/Coordinator.java new file mode 100644 index 000000000000..3577b73854a2 --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/Coordinator.java @@ -0,0 +1,36 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Coordinator +{ +} diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java new file mode 100644 index 000000000000..0cd6f4e139ed --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -0,0 +1,125 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.annotations.Global; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; + +import java.net.URI; +import java.net.URL; +import java.util.List; + +public class CoordinatorClient +{ + private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); + + private final HttpClient client; + private final ObjectMapper jsonMapper; + private final ServerDiscoverySelector selector; + + @Inject + public CoordinatorClient( + @Global HttpClient client, + ObjectMapper jsonMapper, + @Coordinator ServerDiscoverySelector selector + ) + { + this.client = client; + this.jsonMapper = jsonMapper; + this.selector = selector; + } + + + public List fetchServerView(String dataSource, Interval interval, boolean incompleteOk) + { + try { + StatusResponseHolder response = client.go( + new Request( + HttpMethod.GET, + new URL( + String.format( + "%s/datasources/%s/intervals/%s/serverview?partial=%s", + baseUrl(), + dataSource, + interval.toString().replace("/", "_"), + incompleteOk + ) + ) + ), + RESPONSE_HANDLER + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching serverView status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + + } + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + + private String baseUrl() + { + try { + final Server instance = selector.pick(); + if (instance == null) { + throw new ISE("Cannot find instance of coordinator"); + } + + return new URI( + instance.getScheme(), + null, + instance.getAddress(), + instance.getPort(), + "/druid/coordinator/v1", + null, + null + ).toString(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java new file mode 100644 index 000000000000..5198c0576d94 --- /dev/null +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorSelectorConfig.java @@ -0,0 +1,37 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.coordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class CoordinatorSelectorConfig +{ + public static final String DEFAULT_SERVICE_NAME = "druid/coordinator"; + + @JsonProperty + private String serviceName = DEFAULT_SERVICE_NAME; + + public String getServiceName() + { + return serviceName; + } +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java new file mode 100644 index 000000000000..1d2e1a85b0c3 --- /dev/null +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import io.druid.client.coordinator.Coordinator; +import io.druid.client.coordinator.CoordinatorSelectorConfig; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; + +/** + */ +public class CoordinatorDiscoveryModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class); + } + + @Provides + @Coordinator + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( + CoordinatorSelectorConfig config, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + return serverDiscoveryFactory.createSelector(config.getServiceName()); + } +} diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index 803fbfacec8e..356122e65a8a 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -21,8 +21,6 @@ import com.google.inject.Binder; import com.google.inject.Module; -import io.druid.client.FilteredServerView; -import io.druid.client.FilteredServerViewProvider; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -36,10 +34,8 @@ public class ServerViewModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); - JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); - binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class); } } diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 5f3654fab3d6..0a13b6d78711 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -35,6 +35,7 @@ import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; +import io.druid.guice.CoordinatorDiscoveryModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; import io.druid.guice.ExtensionsConfig; @@ -298,6 +299,7 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter new DerbyMetadataStorageDruidModule(), new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), + new CoordinatorDiscoveryModule(), new LocalDataStorageDruidModule(), new FirehoseModule(), new ParsersModule() diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java new file mode 100644 index 000000000000..f27d8b8537ce --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -0,0 +1,172 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.client.coordinator.CoordinatorClient; +import io.druid.concurrent.Execs; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNotifier +{ + private static final Logger log = new Logger(CoordinatorBasedSegmentHandoffNotifier.class); + + private final ConcurrentMap> handOffCallbacks = Maps.newConcurrentMap(); + private final CoordinatorClient coordinatorClient; + private volatile ScheduledExecutorService scheduledExecutor; + private final long pollDurationMillis; + private final String dataSource; + + public CoordinatorBasedSegmentHandoffNotifier( + String dataSource, + CoordinatorClient coordinatorClient, + CoordinatorBasedSegmentHandoffNotifierConfig config + ) + { + this.dataSource = dataSource; + this.coordinatorClient = coordinatorClient; + this.pollDurationMillis = config.getPollDuration().getMillis(); + } + + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + log.info("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", dataSource, descriptor); + Pair prev = handOffCallbacks.putIfAbsent( + descriptor, + new Pair<>(exec, handOffRunnable) + ); + return prev == null; + } + + @Override + public void start() + { + scheduledExecutor = Execs.scheduledSingleThreaded("coordinator_handoff_scheduled_%d"); + scheduledExecutor.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + checkForSegmentHandoffs(); + } + }, 0L, pollDurationMillis, TimeUnit.MILLISECONDS + ); + } + + void checkForSegmentHandoffs() + { + try { + Iterator>> itr = handOffCallbacks.entrySet() + .iterator(); + while (itr.hasNext()) { + Map.Entry> entry = itr.next(); + SegmentDescriptor descriptor = entry.getKey(); + try { + List loadedSegments = coordinatorClient.fetchServerView( + dataSource, + descriptor.getInterval(), + true + ); + + if (isHandOffComplete(loadedSegments, entry.getKey())) { + log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor); + entry.getValue().lhs.execute(entry.getValue().rhs); + itr.remove(); + } + } + catch (Exception e) { + log.error( + e, + "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", + dataSource, + descriptor, + pollDurationMillis + ); + } + } + if (!handOffCallbacks.isEmpty()) { + log.info("Still waiting for Handoff for Segments : [%s]", handOffCallbacks.keySet()); + } + } + catch (Throwable t) { + log.error( + t, + "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", + dataSource, + pollDurationMillis + ); + } + } + + + static boolean isHandOffComplete(List serverView, SegmentDescriptor descriptor) + { + for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { + if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() + == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 + && Iterables.any( + segmentLoadInfo.getServers(), new Predicate() + { + @Override + public boolean apply(DruidServerMetadata input) + { + return input.isAssignable(); + } + } + )) { + return true; + } + } + return false; + } + + @Override + public void stop() + { + scheduledExecutor.shutdown(); + } + + // Used in tests + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java new file mode 100644 index 000000000000..ab08a3a7d1bb --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierConfig.java @@ -0,0 +1,35 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +public class CoordinatorBasedSegmentHandoffNotifierConfig +{ + @JsonProperty + public Duration pollDuration = new Period("PT1M").toStandardDuration(); + + public Duration getPollDuration() + { + return pollDuration; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..3b96949754c4 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierFactory.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.inject.Inject; +import io.druid.client.coordinator.CoordinatorClient; + +public class CoordinatorBasedSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory +{ + + private final CoordinatorClient client; + private final CoordinatorBasedSegmentHandoffNotifierConfig config; + + @Inject + public CoordinatorBasedSegmentHandoffNotifierFactory( + CoordinatorClient client, + CoordinatorBasedSegmentHandoffNotifierConfig config + ) + { + this.client = client; + this.config = config; + } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new CoordinatorBasedSegmentHandoffNotifier( + dataSource, + client, + config + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1e981781516b..8d5641a4cb59 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -42,8 +41,6 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; @@ -78,7 +75,6 @@ import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -119,7 +115,7 @@ public class RealtimePlumber implements Plumber private final ExecutorService queryExecutorService; private final DataSegmentPusher dataSegmentPusher; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifier handoffNotifier; private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( @@ -155,7 +151,7 @@ public RealtimePlumber( ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - FilteredServerView serverView, + SegmentHandoffNotifier handoffNotifier, IndexMerger indexMerger, IndexIO indexIO, Cache cache, @@ -173,14 +169,14 @@ public RealtimePlumber( this.queryExecutorService = queryExecutorService; this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifier = handoffNotifier; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; this.objectMapper = objectMapper; - if(!cache.isLocal()) { + if (!cache.isLocal()) { log.error("Configured cache is not local, caching will not be enabled"); } @@ -212,8 +208,8 @@ public Object startJob() { computeBaseDir(schema).mkdirs(); initializeExecutors(); + handoffNotifier.start(); Object retVal = bootstrapSinksFromDisk(); - registerServerViewCallback(); startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); @@ -258,17 +254,8 @@ private Sink getSink(long timestamp) ); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); + addSink(retVal); - try { - segmentAnnouncer.announceSegment(retVal.getSegment()); - sinks.put(truncatedTime, retVal); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); - } - catch (IOException e) { - log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) - .addData("interval", retVal.getInterval()) - .emit(); - } } return retVal; @@ -556,7 +543,7 @@ public void doRun() mergedTarget, config.getIndexSpec() ); - + // emit merge metrics before publishing segment metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); @@ -597,6 +584,17 @@ public void doRun() } } ); + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()), + mergeExecutor, new Runnable() + { + @Override + public void run() + { + abandonSegment(sink.getInterval().getStartMillis(), sink); + } + } + ); } @Override @@ -640,6 +638,7 @@ public String apply(Sink input) } } + handoffNotifier.stop(); shutdownExecutors(); stopped = true; @@ -678,11 +677,11 @@ protected void initializeExecutors() protected void shutdownExecutors() { - // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the - // ServerView sends it a new segment callback + // scheduledExecutor is shutdown here if (scheduledExecutor != null) { scheduledExecutor.shutdown(); persistExecutor.shutdown(); + mergeExecutor.shutdown(); } } @@ -703,7 +702,7 @@ protected Object bootstrapSinksFromDisk() Object metadata = null; long latestCommitTime = 0; for (File sinkDir : files) { - Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); + final Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); // To avoid reading and listing of "merged" dir @@ -735,97 +734,101 @@ public int compare(File o1, File o2) } ); boolean isCorrupted = false; - try { - List hydrants = Lists.newArrayList(); - for (File segmentDir : sinkFiles) { - log.info("Loading previously persisted segment at [%s]", segmentDir); - - // Although this has been tackled at start of this method. - // Just a doubly-check added to skip "merged" dir. from being added to hydrants - // If 100% sure that this is not needed, this check can be removed. - if (Ints.tryParse(segmentDir.getName()) == null) { - continue; - } - QueryableIndex queryableIndex = null; + List hydrants = Lists.newArrayList(); + for (File segmentDir : sinkFiles) { + log.info("Loading previously persisted segment at [%s]", segmentDir); + + // Although this has been tackled at start of this method. + // Just a doubly-check added to skip "merged" dir. from being added to hydrants + // If 100% sure that this is not needed, this check can be removed. + if (Ints.tryParse(segmentDir.getName()) == null) { + continue; + } + QueryableIndex queryableIndex = null; + try { + queryableIndex = indexIO.loadIndex(segmentDir); + } + catch (IOException e) { + log.error(e, "Problem loading segmentDir from disk."); + isCorrupted = true; + } + if (isCorrupted) { try { - queryableIndex = indexIO.loadIndex(segmentDir); - } - catch (IOException e) { - log.error(e, "Problem loading segmentDir from disk."); - isCorrupted = true; + File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); + log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); + FileUtils.copyDirectory(segmentDir, corruptSegmentDir); + FileUtils.deleteDirectory(segmentDir); } - if (isCorrupted) { - try { - File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); - log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); - FileUtils.copyDirectory(segmentDir, corruptSegmentDir); - FileUtils.deleteDirectory(segmentDir); - } - catch (Exception e1) { - log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); - } - //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed - //at some point. - continue; + catch (Exception e1) { + log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); } - Map segmentMetadata = queryableIndex.getMetaData(); - if (segmentMetadata != null) { - Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); - if (timestampObj != null) { - long timestamp = ((Long) timestampObj).longValue(); - if (timestamp > latestCommitTime) { - log.info( - "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", - queryableIndex.getMetaData(), timestamp, latestCommitTime - ); - latestCommitTime = timestamp; - metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); - } + //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed + //at some point. + continue; + } + Map segmentMetadata = queryableIndex.getMetaData(); + if (segmentMetadata != null) { + Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); + if (timestampObj != null) { + long timestamp = ((Long) timestampObj).longValue(); + if (timestamp > latestCommitTime) { + log.info( + "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", + queryableIndex.getMetaData(), timestamp, latestCommitTime + ); + latestCommitTime = timestamp; + metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); } } - hydrants.add( - new FireHydrant( - new QueryableIndexSegment( - DataSegment.makeDataSegmentIdentifier( - schema.getDataSource(), - sinkInterval.getStart(), - sinkInterval.getEnd(), - versioningPolicy.getVersion(sinkInterval), - config.getShardSpec() - ), - queryableIndex - ), - Integer.parseInt(segmentDir.getName()) - ) - ); } - if (hydrants.isEmpty()) { - // Probably encountered a corrupt sink directory - log.warn( - "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", - sinkDir.getAbsolutePath() - ); - continue; - } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); - sinks.put(sinkInterval.getStartMillis(), currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - new SingleElementPartitionChunk(currSink) + hydrants.add( + new FireHydrant( + new QueryableIndexSegment( + DataSegment.makeDataSegmentIdentifier( + schema.getDataSource(), + sinkInterval.getStart(), + sinkInterval.getEnd(), + versioningPolicy.getVersion(sinkInterval), + config.getShardSpec() + ), + queryableIndex + ), + Integer.parseInt(segmentDir.getName()) + ) ); - - segmentAnnouncer.announceSegment(currSink.getSegment()); } - catch (IOException e) { - log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) - .addData("interval", sinkInterval) - .emit(); + if (hydrants.isEmpty()) { + // Probably encountered a corrupt sink directory + log.warn( + "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", + sinkDir.getAbsolutePath() + ); + continue; } + final Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); + addSink(currSink); } return metadata; } + private void addSink(final Sink sink) + { + sinks.put(sink.getInterval().getStartMillis(), sink); + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk(sink) + ); + try { + segmentAnnouncer.announceSegment(sink.getSegment()); + } + catch (IOException e) { + log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + protected void startPersistThread() { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); @@ -939,28 +942,30 @@ private void mergeAndPush() */ protected void abandonSegment(final long truncatedTime, final Sink sink) { - try { - segmentAnnouncer.unannounceSegment(sink.getSegment()); - removeSegment(sink, computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); - sinks.remove(truncatedTime); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - new SingleElementPartitionChunk<>(sink) - ); - for (FireHydrant hydrant : sink) { - cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + if (sinks.containsKey(truncatedTime)) { + try { + segmentAnnouncer.unannounceSegment(sink.getSegment()); + removeSegment(sink, computePersistDir(schema, sink.getInterval())); + log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); + sinks.remove(truncatedTime); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk<>(sink) + ); + for (FireHydrant hydrant : sink) { + cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + } + synchronized (handoffCondition) { + handoffCondition.notifyAll(); + } } - synchronized (handoffCondition) { - handoffCondition.notifyAll(); + catch (Exception e) { + log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); } } - catch (Exception e) { - log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .emit(); - } } protected File computeBaseDir(DataSchema schema) @@ -1044,72 +1049,6 @@ protected int persistHydrant( } } - private void registerServerViewCallback() - { - serverView.registerSegmentCallback( - mergeExecutor, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) - { - if (stopped) { - log.info("Unregistering ServerViewCallback"); - mergeExecutor.shutdown(); - return ServerView.CallbackAction.UNREGISTER; - } - - if (!server.isAssignable()) { - return ServerView.CallbackAction.CONTINUE; - } - - log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - ) { - final Interval interval = segment.getInterval(); - for (Map.Entry entry : sinks.entrySet()) { - final Long sinkKey = entry.getKey(); - if (interval.contains(sinkKey)) { - final Sink sink = entry.getValue(); - log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server); - - final String segmentVersion = segment.getVersion(); - final String sinkVersion = sink.getSegment().getVersion(); - if (segmentVersion.compareTo(sinkVersion) >= 0) { - log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - abandonSegment(sinkKey, sink); - } - } - } - } - - return ServerView.CallbackAction.CONTINUE; - } - }, - new Predicate() - { - @Override - public boolean apply(final DataSegment segment) - { - return - schema.getDataSource().equalsIgnoreCase(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - && Iterables.any( - sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } - ); - } - } - ); - } - private void removeSegment(final Sink sink, final File target) { if (target.exists()) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index f8f28f4a8b47..fe2a94e52809 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; @@ -49,7 +48,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final DataSegmentPusher dataSegmentPusher; private final DataSegmentAnnouncer segmentAnnouncer; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; private final IndexIO indexIO; @@ -64,21 +63,21 @@ public RealtimePlumberSchool( @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject SegmentPublisher segmentPublisher, - @JacksonInject FilteredServerView serverView, + @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, @JacksonInject @Processing ExecutorService executorService, @JacksonInject IndexMerger indexMerger, @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, @JacksonInject ObjectMapper objectMapper - ) + ) { this.emitter = emitter; this.conglomerate = conglomerate; this.dataSegmentPusher = dataSegmentPusher; this.segmentAnnouncer = segmentAnnouncer; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryExecutorService = executorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -107,7 +106,7 @@ public Plumber findPlumber( queryExecutorService, dataSegmentPusher, segmentPublisher, - serverView, + handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), indexMerger, indexIO, cache, @@ -122,7 +121,7 @@ private void verifyState() Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action."); Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action."); Preconditions.checkNotNull(segmentPublisher, "must specify a segmentPublisher to do this action."); - Preconditions.checkNotNull(serverView, "must specify a serverView to do this action."); + Preconditions.checkNotNull(handoffNotifierFactory, "must specify a handoffNotifierFactory to do this action."); Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java new file mode 100644 index 000000000000..fbccba57bcdc --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java @@ -0,0 +1,52 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import io.druid.query.SegmentDescriptor; + +import java.util.concurrent.Executor; + +public interface SegmentHandoffNotifier +{ + /** + * register a handOffCallback to be called when segment handoff is complete. + * + * @param descriptor segment descriptor for the segment for which handoffCallback is requested + * @param exec executor used to call the runnable + * @param handOffRunnable runnable to be called when segment handoff is complete + */ + boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, + Executor exec, + Runnable handOffRunnable + ); + + /** + * Perform any initial setup. Should be called before using any other methods, and should be paired + * with a corresponding call to {@link #stop()}. + */ + void start(); + + /** + * Perform any final processing and clean up after ourselves. + */ + void stop(); + +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..454487770ed0 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java @@ -0,0 +1,26 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + + +public interface SegmentHandoffNotifierFactory +{ + SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource); +} diff --git a/server/src/main/java/io/druid/server/http/DatasourcesResource.java b/server/src/main/java/io/druid/server/http/DatasourcesResource.java index 40078aa04a6e..09354ca71212 100644 --- a/server/src/main/java/io/druid/server/http/DatasourcesResource.java +++ b/server/src/main/java/io/druid/server/http/DatasourcesResource.java @@ -29,12 +29,20 @@ import com.metamx.common.MapUtils; import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.common.logger.Logger; +import io.druid.client.CoordinatorServerView; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; -import io.druid.client.InventoryView; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; import io.druid.client.indexing.IndexingServiceClient; import io.druid.metadata.MetadataSegmentManager; +import io.druid.query.TableDataSource; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.PartitionChunk; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -60,13 +68,15 @@ @Path("/druid/coordinator/v1/datasources") public class DatasourcesResource { - private final InventoryView serverInventoryView; + private static final Logger log = new Logger(DatasourcesResource.class); + + private final CoordinatorServerView serverInventoryView; private final MetadataSegmentManager databaseSegmentManager; private final IndexingServiceClient indexingServiceClient; @Inject public DatasourcesResource( - InventoryView serverInventoryView, + CoordinatorServerView serverInventoryView, MetadataSegmentManager databaseSegmentManager, @Nullable IndexingServiceClient indexingServiceClient ) @@ -556,4 +566,55 @@ private Map> getSimpleDatasource(String dataSourceNa segments.put("maxTime", new DateTime(maxTime)); return retVal; } + + /** + * Provides serverView for a datasource and Interval which gives details about servers hosting segments for an interval + * Used by the realtime tasks to fetch a view of the interval they are interested in. + */ + @GET + @Path("/{dataSourceName}/intervals/{interval}/serverview") + @Produces(MediaType.APPLICATION_JSON) + public Response getSegmentDataSourceSpecificInterval( + @PathParam("dataSourceName") String dataSourceName, + @PathParam("interval") String interval, + @QueryParam("partial") final boolean partial + ) + { + TimelineLookup timeline = serverInventoryView.getTimeline( + new TableDataSource(dataSourceName) + ); + final Interval theInterval = new Interval(interval.replace("_", "/")); + if (timeline == null) { + log.debug("No timeline found for datasource[%s]", dataSourceName); + return Response.ok(Lists.newArrayList()).build(); + } + + Iterable> lookup = timeline.lookupWithIncompletePartitions(theInterval); + FunctionalIterable retval = FunctionalIterable + .create(lookup).transformCat( + new Function, Iterable>() + { + @Override + public Iterable apply( + TimelineObjectHolder input + ) + { + return Iterables.transform( + input.getObject(), + new Function, ImmutableSegmentLoadInfo>() + { + @Override + public ImmutableSegmentLoadInfo apply( + PartitionChunk chunk + ) + { + return chunk.getObject().toImmutableSegmentLoadInfo(); + } + } + ); + } + } + ); + return Response.ok(retval).build(); + } } diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 876e4494cc64..571448e9d2dc 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -319,8 +319,7 @@ private void setupViews() throws Exception baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ) { @Override diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java new file mode 100644 index 000000000000..5b31d569fee5 --- /dev/null +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import io.druid.curator.CuratorTestBase; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.TableDataSource; +import io.druid.segment.Segment; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.PartitionHolder; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +public class CoordinatorServerViewTest extends CuratorTestBase +{ + private final ObjectMapper jsonMapper; + private final ZkPathsConfig zkPathsConfig; + private final String announcementsPath; + private final String inventoryPath; + + private CountDownLatch segmentViewInitLatch; + private CountDownLatch segmentAddedLatch; + private CountDownLatch segmentRemovedLatch; + + private ServerInventoryView baseView; + private CoordinatorServerView overlordServerView; + + public CoordinatorServerViewTest() + { + jsonMapper = new DefaultObjectMapper(); + zkPathsConfig = new ZkPathsConfig(); + announcementsPath = zkPathsConfig.getAnnouncementsPath(); + inventoryPath = zkPathsConfig.getLiveSegmentsPath(); + } + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + curator.start(); + } + + @Test + public void testSingleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(1); + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final DruidServer druidServer = new DruidServer( + "localhost:1234", + "localhost:1234", + 10000000L, + "historical", + "default_tier", + 0 + ); + + setupZNodeForServer(druidServer); + + final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); + announceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + List serverLookupRes = (List) timeline.lookup( + new Interval( + "2014-10-20T00:00:00Z/P1D" + ) + ); + Assert.assertEquals(1, serverLookupRes.size()); + + TimelineObjectHolder actualTimelineObjectHolder = serverLookupRes.get(0); + Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); + Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); + + PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(segmentLoadInfo.isEmpty()); + Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + + unannounceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size() + ); + Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1")); + } + + @Test + public void testMultipleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(5); + + // temporarily set latch count to 1 + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final List druidServers = Lists.transform( + ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + new Function() + { + @Override + public DruidServer apply(String input) + { + return new DruidServer( + input, + input, + 10000000L, + "historical", + "default_tier", + 0 + ); + } + } + ); + + for (DruidServer druidServer : druidServers) { + setupZNodeForServer(druidServer); + } + + final List segments = Lists.transform( + ImmutableList.>of( + Pair.of("2011-04-01/2011-04-03", "v1"), + Pair.of("2011-04-03/2011-04-06", "v1"), + Pair.of("2011-04-01/2011-04-09", "v2"), + Pair.of("2011-04-06/2011-04-09", "v3"), + Pair.of("2011-04-01/2011-04-02", "v3") + ), new Function, DataSegment>() + { + @Override + public DataSegment apply(Pair input) + { + return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs); + } + } + ); + + for (int i = 0; i < 5; ++i) { + announceSegmentForServer(druidServers.get(i), segments.get(i)); + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") + unannounceSegmentForServer(druidServers.get(2), segments.get(2)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + // renew segmentRemovedLatch since we still have 4 segments to unannounce + segmentRemovedLatch = new CountDownLatch(4); + + timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)), + createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce all the segments + for (int i = 0; i < 5; ++i) { + // skip the one that was previously unannounced + if (i != 2) { + unannounceSegmentForServer(druidServers.get(i), segments.get(i)); + } + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size() + ); + } + + private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath( + ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), + jsonMapper.writeValueAsBytes( + ImmutableSet.of(segment) + ) + ); + } + + private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.delete().guaranteed().forPath( + ZKPaths.makePath( + ZKPaths.makePath(inventoryPath, druidServer.getHost()), + segment.getIdentifier() + ) + ); + } + + private Pair>> createExpected( + String intervalStr, + String version, + DruidServer druidServer, + DataSegment segment + ) + { + return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment))); + } + + private void assertValues( + List>>> expected, List actual + ) + { + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < expected.size(); ++i) { + Pair>> expectedPair = expected.get(i); + TimelineObjectHolder actualTimelineObjectHolder = actual.get(i); + + Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); + Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); + + PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(segmentLoadInfo.isEmpty()); + Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())); + } + } + + private void setupViews() throws Exception + { + baseView = new BatchServerInventoryView( + zkPathsConfig, + curator, + jsonMapper + ) + { + @Override + public void registerSegmentCallback(Executor exec, final SegmentCallback callback) + { + super.registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentAdded(server, segment); + segmentAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentRemoved(server, segment); + segmentRemovedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentViewInitialized() + { + CallbackAction res = callback.segmentViewInitialized(); + segmentViewInitLatch.countDown(); + return res; + } + } + ); + } + }; + + overlordServerView = new CoordinatorServerView( + baseView + ); + + baseView.start(); + } + + private void setupZNodeForServer(DruidServer server) throws Exception + { + curator.create() + .creatingParentsIfNeeded() + .forPath( + ZKPaths.makePath(announcementsPath, server.getHost()), + jsonMapper.writeValueAsBytes(server.getMetadata()) + ); + curator.create() + .creatingParentsIfNeeded() + .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); + } + + private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) + { + return DataSegment.builder() + .dataSource("test_overlord_server_view") + .interval(new Interval(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(version) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NoneShardSpec()) + .binaryVersion(9) + .size(0) + .build(); + } + + @After + public void tearDown() throws Exception + { + baseView.stop(); + tearDownServerAndCurator(); + } +} diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index ff58e2ee7347..d6f0d5bc0278 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -20,8 +20,6 @@ package io.druid.client.client; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -33,7 +31,6 @@ import com.metamx.common.ISE; import io.druid.client.BatchServerInventoryView; import io.druid.client.DruidServer; -import io.druid.client.ServerView; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; @@ -47,9 +44,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.LogicalOperator; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -59,9 +53,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -159,8 +151,7 @@ public String getBase() } }, cf, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ); batchServerInventoryView.start(); @@ -175,16 +166,9 @@ public String getBase() } }, cf, - jsonMapper, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS)); - } - } - ){ + jsonMapper + ) + { @Override protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, Set inventory @@ -243,122 +227,6 @@ public void testRun() throws Exception Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); } - @Test - public void testRunWithFilter() throws Exception - { - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - int prevUpdateCount = inventoryUpdateCounter.get(); - // segment outside the range of default filter - DataSegment segment1 = makeSegment(101); - segmentAnnouncer.announceSegment(segment1); - testSegments.add(segment1); - - waitForUpdateEvents(prevUpdateCount + 1); - Assert.assertNull( - Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory()) - .getSegment(segment1.getIdentifier()) - ); - } - - @Test - public void testRunWithFilterCallback() throws Exception - { - final CountDownLatch removeCallbackLatch = new CountDownLatch(1); - - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - - ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class); - Comparator dataSegmentComparator = new Comparator() - { - @Override - public int compare(DataSegment o1, DataSegment o2) - { - return o1.getInterval().equals(o2.getInterval()) ? 0 : -1; - } - }; - - EasyMock - .expect( - callback.segmentAdded( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andReturn(ServerView.CallbackAction.CONTINUE) - .times(1); - - EasyMock - .expect( - callback.segmentRemoved( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andAnswer( - new IAnswer() - { - @Override - public ServerView.CallbackAction answer() throws Throwable - { - removeCallbackLatch.countDown(); - return ServerView.CallbackAction.CONTINUE; - } - } - ) - .times(1); - - - EasyMock.replay(callback); - - filteredBatchServerInventoryView.registerSegmentCallback( - MoreExecutors.sameThreadExecutor(), - callback, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2)); - } - } - ); - - DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2); - segmentAnnouncer.announceSegment(segment2); - testSegments.add(segment2); - - DataSegment oldSegment = makeSegment(-1); - segmentAnnouncer.announceSegment(oldSegment); - testSegments.add(oldSegment); - - segmentAnnouncer.unannounceSegment(oldSegment); - testSegments.remove(oldSegment); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - segmentAnnouncer.unannounceSegment(segment2); - testSegments.remove(segment2); - - waitForSync(filteredBatchServerInventoryView, testSegments); - timing.forWaiting().awaitLatch(removeCallbackLatch); - - EasyMock.verify(callback); - } - private DataSegment makeSegment(int offset) { return DataSegment.builder() @@ -395,7 +263,11 @@ private void waitForUpdateEvents(int count) while (inventoryUpdateCounter.get() != count) { Thread.sleep(100); if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) { - throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get()); + throw new ISE( + "BatchServerInventoryView is not updating counter expected[%d] value[%d]", + count, + inventoryUpdateCounter.get() + ); } } } @@ -459,7 +331,7 @@ public String getBase() List segments = new ArrayList(); try { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { - segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); + segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); } latch.countDown(); latch.await(); diff --git a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java new file mode 100644 index 000000000000..1fba30287f59 --- /dev/null +++ b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java @@ -0,0 +1,66 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.client.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.IOException; + +public class ImmutableSegmentLoadInfoTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void testSerde() throws IOException + { + ImmutableSegmentLoadInfo segmentLoadInfo = new ImmutableSegmentLoadInfo( + new DataSegment( + "test_ds", + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + null, + null, + null, + new NoneShardSpec(), + 0, 0 + ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1)) + ); + + ImmutableSegmentLoadInfo serde = mapper.readValue( + mapper.writeValueAsBytes(segmentLoadInfo), + ImmutableSegmentLoadInfo.class + ); + + Assert.assertEquals(segmentLoadInfo, serde); + } + +} diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java new file mode 100644 index 000000000000..c57665344644 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.ImmutableSegmentLoadInfo; +import io.druid.client.SegmentLoadInfo; +import io.druid.client.coordinator.CoordinatorClient; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CoordinatorBasedSegmentHandoffNotifierTest +{ + + private final CoordinatorBasedSegmentHandoffNotifierConfig notifierConfig = new CoordinatorBasedSegmentHandoffNotifierConfig() + { + @Override + public Duration getPollDuration() + { + return Duration.millis(10); + } + }; + + @Test + public void testHandoffCallbackNotCalled() throws IOException, InterruptedException + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + SegmentDescriptor descriptor = new SegmentDescriptor( + interval, "v1", 2 + ); + DataSegment segment = new DataSegment( + "test_ds", + interval, + "v1", + null, + null, + null, + new NumberedShardSpec(2, 3), + 0, 0 + ); + + CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); + EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) + .andReturn( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + segment, + Sets.newHashSet( + createRealtimeServerMetadata("a1") + ) + ) + ) + ).anyTimes(); + EasyMock.replay(coordinatorClient); + CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( + "test_ds", + coordinatorClient, + notifierConfig + ); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + callbackCalled.set(true); + } + } + ); + notifier.checkForSegmentHandoffs(); + // callback should have registered + Assert.assertEquals(1, notifier.getHandOffCallbacks().size()); + Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor)); + Assert.assertFalse(callbackCalled.get()); + EasyMock.verify(coordinatorClient); + } + + @Test + public void testHandoffCallbackCalled() throws IOException, InterruptedException + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + SegmentDescriptor descriptor = new SegmentDescriptor( + interval, "v1", 2 + ); + DataSegment segment = new DataSegment( + "test_ds", + interval, + "v1", + null, + null, + null, + new NumberedShardSpec(2, 3), + 0, 0 + ); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); + EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) + .andReturn( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + segment, + Sets.newHashSet( + createHistoricalServerMetadata("a1") + ) + ) + ) + ).anyTimes(); + EasyMock.replay(coordinatorClient); + CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( + "test_ds", + coordinatorClient, + notifierConfig + ); + + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + callbackCalled.set(true); + } + } + ); + Assert.assertEquals(1, notifier.getHandOffCallbacks().size()); + Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor)); + notifier.checkForSegmentHandoffs(); + // callback should have been removed + Assert.assertTrue(notifier.getHandOffCallbacks().isEmpty()); + Assert.assertTrue(callbackCalled.get()); + EasyMock.verify(coordinatorClient); + } + + @Test + public void testHandoffChecksForVersion() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v2", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v2", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForAssignableServer() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createRealtimeServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + } + + @Test + public void testHandoffChecksForPartitionNumber() + { + Interval interval = new Interval( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 1) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForInterval() + { + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 1 + ), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-03" + ), "v1", 1 + ) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Lists.newArrayList( + new ImmutableSegmentLoadInfo( + createSegment( + new Interval( + "2011-04-01/2011-04-04" + ), "v1", 1 + ), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor( + new Interval( + "2011-04-02/2011-04-03" + ), "v1", 1 + ) + ) + ); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name) + { + return createServerMetadata(name, "realtime"); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name) + { + return createServerMetadata(name, "historical"); + } + + private DruidServerMetadata createServerMetadata(String name, String type) + { + return new DruidServerMetadata( + name, + name, + 10000, + type, + "tier", + 1 + ); + } + + private DataSegment createSegment(Interval interval, String version, int partitionNumber) + { + return new DataSegment( + "test_ds", + interval, + version, + null, + null, + null, + new NumberedShardSpec(partitionNumber, 100), + 0, 0 + ); + } +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 1f60c8cd7a5e..9d05fac366ce 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -20,7 +20,6 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,8 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -43,6 +40,7 @@ import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.TestHelper; @@ -89,7 +87,8 @@ public class RealtimePlumberSchoolTest private DataSegmentAnnouncer announcer; private SegmentPublisher segmentPublisher; private DataSegmentPusher dataSegmentPusher; - private FilteredServerView serverView; + private SegmentHandoffNotifier handoffNotifier; + private SegmentHandoffNotifierFactory handoffNotifierFactory; private ServiceEmitter emitter; private RealtimeTuningConfig tuningConfig; private DataSchema schema; @@ -162,17 +161,20 @@ public void setUp() throws Exception segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class); dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class); - serverView = EasyMock.createMock(FilteredServerView.class); - serverView.registerSegmentCallback( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.>anyObject() - ); - EasyMock.expectLastCall().anyTimes(); + handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class); + EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes(); + EasyMock.expect( + handoffNotifier.registerSegmentHandoffCallback( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ) + ).andReturn(true).anyTimes(); emitter = EasyMock.createMock(ServiceEmitter.class); - EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); tuningConfig = new RealtimeTuningConfig( 1, @@ -192,7 +194,7 @@ public void setUp() throws Exception dataSegmentPusher, announcer, segmentPublisher, - serverView, + handoffNotifierFactory, MoreExecutors.sameThreadExecutor(), TestHelper.getTestIndexMerger(), TestHelper.getTestIndexIO(), @@ -208,7 +210,7 @@ public void setUp() throws Exception @After public void tearDown() throws Exception { - EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher,handoffNotifierFactory, handoffNotifier, emitter); FileUtils.deleteDirectory( new File( tuningConfig.getBasePersistDirectory(), @@ -335,15 +337,15 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); plumber2.getSinks() - .put( - 0L, - new Sink( - testInterval, - schema2, - tuningConfig, - new DateTime("2014-12-01T12:34:56.789").toString() - ) - ); + .put( + 0L, + new Sink( + testInterval, + schema2, + tuningConfig, + new DateTime("2014-12-01T12:34:56.789").toString() + ) + ); Assert.assertNull(plumber2.startJob()); final Committer committer = new Committer() @@ -377,14 +379,18 @@ public void run() File persistDir = plumber2.computePersistDir(schema2, testInterval); /* Check that all hydrants were persisted */ - for (int i = 0; i < 5; i ++) { + for (int i = 0; i < 5; i++) { Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists()); } /* Create some gaps in the persisted hydrants and reload */ FileUtils.deleteDirectory(new File(persistDir, "1")); FileUtils.deleteDirectory(new File(persistDir, "3")); - RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + schema2, + tuningConfig, + metrics + ); restoredPlumber.bootstrapSinksFromDisk(); Map sinks = restoredPlumber.getSinks(); @@ -393,26 +399,37 @@ public void run() List hydrants = Lists.newArrayList(sinks.get(new Long(0))); DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z"); Assert.assertEquals(0, hydrants.get(0).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), - hydrants.get(0).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), + hydrants.get(0).getSegment().getDataInterval() + ); Assert.assertEquals(2, hydrants.get(1).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), - hydrants.get(1).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), + hydrants.get(1).getSegment().getDataInterval() + ); Assert.assertEquals(4, hydrants.get(2).getCount()); - Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), - hydrants.get(2).getSegment().getDataInterval()); + Assert.assertEquals( + new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), + hydrants.get(2).getSegment().getDataInterval() + ); /* Delete all the hydrants and reload, no sink should be created */ FileUtils.deleteDirectory(new File(persistDir, "0")); FileUtils.deleteDirectory(new File(persistDir, "2")); FileUtils.deleteDirectory(new File(persistDir, "4")); - RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber( + schema2, + tuningConfig, + metrics + ); restoredPlumber2.bootstrapSinksFromDisk(); Assert.assertEquals(0, restoredPlumber2.getSinks().size()); } - private InputRow getTestInputRow(final String timeStr) { + private InputRow getTestInputRow(final String timeStr) + { return new InputRow() { @Override diff --git a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java index cfe17189410f..cf3a248f1f00 100644 --- a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java +++ b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.druid.client.CoordinatorServerView; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.InventoryView; @@ -42,7 +43,7 @@ public class DatasourcesResourceTest { - private InventoryView inventoryView; + private CoordinatorServerView inventoryView; private DruidServer server; private List listDataSources; private List dataSegmentList; @@ -50,7 +51,7 @@ public class DatasourcesResourceTest @Before public void setUp() { - inventoryView = EasyMock.createStrictMock(InventoryView.class); + inventoryView = EasyMock.createStrictMock(CoordinatorServerView.class); server = EasyMock.createStrictMock(DruidServer.class); dataSegmentList = new ArrayList<>(); dataSegmentList.add( diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 7c1dbebee075..85dc77f47580 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -29,6 +29,7 @@ import com.metamx.common.logger.Logger; import io.airlift.airline.Command; import io.druid.audit.AuditManager; +import io.druid.client.CoordinatorServerView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.guice.ConfigProvider; import io.druid.guice.Jerseys; @@ -119,6 +120,7 @@ public void configure(Binder binder) .in(ManageLifecycle.class); binder.bind(IndexingServiceClient.class).in(LazySingleton.class); + binder.bind(CoordinatorServerView.class).in(LazySingleton.class); binder.bind(DruidCoordinator.class); @@ -138,6 +140,8 @@ public void configure(Binder binder) Jerseys.addResource(binder, IntervalsResource.class); LifecycleModule.register(binder, Server.class); + LifecycleModule.register(binder, DatasourcesResource.class); + } @Provides diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index ef08e5586189..affa1dc51912 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -33,6 +33,7 @@ import io.airlift.airline.Command; import io.airlift.airline.Option; import io.druid.client.cache.CacheConfig; +import io.druid.client.coordinator.CoordinatorClient; import io.druid.guice.Binders; import io.druid.guice.CacheModule; import io.druid.guice.IndexingServiceFirehoseModule; @@ -73,6 +74,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -130,7 +134,7 @@ public void configure(Binder binder) handlerProviderBinder.addBinding("noop") .to(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class); - + binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); @@ -164,12 +168,22 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule()); + JsonConfigProvider.bind( + binder, + "druid.segment.handoff", + CoordinatorBasedSegmentHandoffNotifierConfig.class + ); + binder.bind(SegmentHandoffNotifierFactory.class) + .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) + .in(LazySingleton.class); + // Override the default SegmentLoaderConfig because we don't actually care about the // configuration based locations. This will override them anyway. This is also stopping // configuration of other parameters, but I don't think that's actually a problem. // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. binder.bind(SegmentLoaderConfig.class) .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + binder.bind(CoordinatorClient.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); Jerseys.addResource(binder, QueryResource.class); diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index d080a6fd08c1..1000b94f998e 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -26,6 +26,7 @@ import com.google.inject.multibindings.MapBinder; import io.druid.cli.QueryJettyServerInitializer; import io.druid.client.cache.CacheConfig; +import io.druid.client.coordinator.CoordinatorClient; import io.druid.metadata.MetadataSegmentPublisher; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.FireDepartment; @@ -36,6 +37,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; +import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -79,10 +83,20 @@ public void configure(Binder binder) .to(NoopChatHandlerProvider.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider(FireDepartmentsProvider.class) .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class); + binder.bind(SegmentHandoffNotifierFactory.class) + .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) + .in(LazySingleton.class); + binder.bind(CoordinatorClient.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule());