diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index daa38a843722..9d05aa606410 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -416,7 +416,7 @@ private void remove( } } - private List> lookup(Interval interval, boolean incompleteOk) + public List> lookup(Interval interval, boolean incompleteOk) { List> retVal = new ArrayList>(); NavigableMap timeline = (incompleteOk) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 0ef8510fc1b0..1858b842a506 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -43,6 +43,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -61,14 +62,14 @@ public class TaskToolbox { private final TaskConfig config; private final Task task; - private final TaskActionClientFactory taskActionClientFactory; + private final TaskActionClient taskActionClient; private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; private final ExecutorService queryExecutorService; @@ -84,14 +85,14 @@ public class TaskToolbox public TaskToolbox( TaskConfig config, Task task, - TaskActionClientFactory taskActionClientFactory, + TaskActionClient taskActionClient, ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, + SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -106,14 +107,14 @@ public TaskToolbox( { this.config = config; this.task = task; - this.taskActionClientFactory = taskActionClientFactory; + this.taskActionClient = taskActionClient; this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -133,7 +134,7 @@ public TaskConfig getConfig() public TaskActionClient getTaskActionClient() { - return taskActionClientFactory.create(task); + return taskActionClient; } public ServiceEmitter getEmitter() @@ -166,9 +167,9 @@ public DataSegmentAnnouncer getSegmentAnnouncer() return segmentAnnouncer; } - public FilteredServerView getNewSegmentServerView() + public SegmentHandoffNotifierFactory getHandoffNotifierFactory() { - return newSegmentServerView; + return handoffNotifierFactory; } public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 763a74ff7e03..0f628283206f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -26,9 +26,12 @@ import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -36,6 +39,7 @@ import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import java.io.File; @@ -54,7 +58,7 @@ public class TaskToolboxFactory private final DataSegmentMover dataSegmentMover; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; - private final FilteredServerView newSegmentServerView; + private final TaskActionBasedHandoffNotifierConfig notifierConfig; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; @@ -75,7 +79,6 @@ public TaskToolboxFactory( DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, - FilteredServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, @@ -84,7 +87,8 @@ public TaskToolboxFactory( IndexMerger indexMerger, IndexIO indexIO, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + TaskActionBasedHandoffNotifierConfig notifierConfig ) { this.config = config; @@ -95,7 +99,6 @@ public TaskToolboxFactory( this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; - this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; @@ -105,23 +108,24 @@ public TaskToolboxFactory( this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; + this.notifierConfig = notifierConfig; } public TaskToolbox build(Task task) { final File taskWorkDir = config.getTaskWorkDir(task.getId()); - + TaskActionClient taskActionClient = taskActionClientFactory.create(task); return new TaskToolbox( config, task, - taskActionClientFactory, + taskActionClient, emitter, segmentPusher, dataSegmentKiller, dataSegmentMover, dataSegmentArchiver, segmentAnnouncer, - newSegmentServerView, + buildNotifierFactory(taskActionClient), queryRunnerFactoryConglomerate, queryExecutorService, monitorScheduler, @@ -134,4 +138,9 @@ public TaskToolbox build(Task task) cacheConfig ); } + + // Used in tests + protected SegmentHandoffNotifierFactory buildNotifierFactory(TaskActionClient taskActionClient){ + return new TaskActionBasedHandoffNotifierFactory(taskActionClient, notifierConfig); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentHandoffCheckAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentHandoffCheckAction.java new file mode 100644 index 000000000000..5290663a5ff7 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentHandoffCheckAction.java @@ -0,0 +1,149 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.OverlordServerView; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + + +public class SegmentHandoffCheckAction implements TaskAction +{ + private static final Logger log = new Logger(SegmentHandoffCheckAction.class); + @JsonIgnore + private final SegmentDescriptor segmentDescriptor; + + @JsonCreator + public SegmentHandoffCheckAction(@JsonProperty("segmentDescriptor") SegmentDescriptor segmentDescriptor) + { + Preconditions.checkNotNull(segmentDescriptor, "segmentDescriptor"); + this.segmentDescriptor = segmentDescriptor; + } + + @JsonProperty("segmentDescriptor") + public SegmentDescriptor getSegmentDescriptor() + { + return segmentDescriptor; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + @Override + public Boolean perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + String dataSource = task.getDataSource(); + OverlordServerView serverView = toolbox.getServerView(); + VersionedIntervalTimeline> timeline = serverView.getTimeline( + new TableDataSource( + dataSource + ) + ); + if (timeline == null) { + log.debug("No timeline found for datasource[%s]", dataSource); + return false; + } + + List>> lookup = timeline.lookup( + segmentDescriptor.getInterval(), + true + ); + + for (TimelineObjectHolder> timelineObjectHolder : lookup) { + if (timelineObjectHolder.getInterval().contains(segmentDescriptor.getInterval()) && + timelineObjectHolder.getVersion().compareTo(segmentDescriptor.getVersion()) >= 0 && + timelineObjectHolder.getObject().getChunk(segmentDescriptor.getPartitionNumber()) != null && + Iterables.any( + timelineObjectHolder.getObject().getChunk(segmentDescriptor.getPartitionNumber()).getObject(), + new Predicate() + { + @Override + public boolean apply(DruidServerMetadata input) + { + return input.isAssignable(); + } + } + )) { + + return true; + } + } + return false; + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "SegmentHandoffCheckAction{" + + "segmentDescriptor=" + segmentDescriptor + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SegmentHandoffCheckAction that = (SegmentHandoffCheckAction) o; + + return segmentDescriptor.equals(that.segmentDescriptor); + + } + + @Override + public int hashCode() + { + return segmentDescriptor.hashCode(); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index 1c0cb386f91e..64ec63a319bc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -35,7 +35,8 @@ @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class), - @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class) + @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class), + @JsonSubTypes.Type(name = "segmentHandoffCheck", value = SegmentHandoffCheckAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java index e85dffdc8994..ce9e2980adce 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java @@ -17,7 +17,6 @@ package io.druid.indexing.common.actions; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.inject.Inject; @@ -26,6 +25,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.OverlordServerView; import io.druid.indexing.overlord.TaskLockbox; import io.druid.timeline.DataSegment; @@ -37,17 +37,20 @@ public class TaskActionToolbox private final TaskLockbox taskLockbox; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final ServiceEmitter emitter; + private final OverlordServerView serverView; @Inject public TaskActionToolbox( TaskLockbox taskLockbox, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, - ServiceEmitter emitter + ServiceEmitter emitter, + OverlordServerView serverView ) { this.taskLockbox = taskLockbox; this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.emitter = emitter; + this.serverView = serverView; } public TaskLockbox getTaskLockbox() @@ -108,4 +111,9 @@ public boolean apply(TaskLock taskLock) return true; } + + public OverlordServerView getServerView() + { + return serverView; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 9c2061f8d7ec..0d33a27ff4fe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -37,6 +37,7 @@ import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockReleaseAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifier; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -55,6 +56,7 @@ import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; +import io.druid.segment.realtime.plumber.RealtimePlumber; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -285,7 +287,7 @@ public String getVersion(final Interval interval) toolbox.getSegmentPusher(), lockingSegmentAnnouncer, segmentPublisher, - toolbox.getNewSegmentServerView(), + toolbox.getHandoffNotifierFactory(), toolbox.getQueryExecutorService(), toolbox.getIndexMerger(), toolbox.getIndexIO(), diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/OverlordServerView.java b/indexing-service/src/main/java/io/druid/indexing/overlord/OverlordServerView.java new file mode 100644 index 000000000000..d41f37f3deec --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/OverlordServerView.java @@ -0,0 +1,206 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.google.api.client.util.Sets; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.client.DruidServer; +import io.druid.client.ServerInventoryView; +import io.druid.client.ServerView; +import io.druid.concurrent.Execs; +import io.druid.query.DataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * ServerView of overlord for the state of segments being loaded in the cluster. + */ +public class OverlordServerView implements ServerView +{ + private static final Logger log = new Logger(OverlordServerView.class); + + private final Object lock = new Object(); + + private final Map> selectors; + private final Map>> timelines; + + private final ServerInventoryView baseView; + + private volatile boolean initialized = false; + + @Inject + public OverlordServerView( + ServerInventoryView baseView + ) + { + + this.baseView = baseView; + this.selectors = Maps.newHashMap(); + this.timelines = Maps.newHashMap(); + + ExecutorService exec = Execs.singleThreaded("OverlordServerView-%s"); + baseView.registerSegmentCallback( + exec, + new ServerView.SegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + serverAddedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment) + { + serverRemovedSegment(server, segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public CallbackAction segmentViewInitialized() + { + initialized = true; + return ServerView.CallbackAction.CONTINUE; + } + } + ); + + baseView.registerServerCallback( + exec, + new ServerView.ServerCallback() + { + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + removeServer(server); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } + + public boolean isInitialized() + { + return initialized; + } + + public void clear() + { + synchronized (lock) { + timelines.clear(); + selectors.clear(); + } + } + + private void removeServer(DruidServer server) + { + for (DataSegment segment : server.getSegments().values()) { + serverRemovedSegment(server.getMetadata(), segment); + } + } + + private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) + { + String segmentId = segment.getIdentifier(); + synchronized (lock) { + log.debug("Adding segment[%s] for server[%s]", segment, server); + + Set servers = selectors.get(segmentId); + if (servers == null) { + servers = Sets.newHashSet(); + + VersionedIntervalTimeline> timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + timelines.put(segment.getDataSource(), timeline); + } + + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(servers)); + selectors.put(segmentId, servers); + } + servers.add(server); + } + } + + private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) + { + String segmentId = segment.getIdentifier(); + final Set servers; + + synchronized (lock) { + log.debug("Removing segment[%s] from server[%s].", segmentId, server); + + servers = selectors.get(segmentId); + if (servers == null) { + log.warn("Told to remove non-existant segment[%s]", segmentId); + return; + } + servers.remove(server); + if (servers.isEmpty()) { + VersionedIntervalTimeline> timeline = timelines.get(segment.getDataSource()); + selectors.remove(segmentId); + + final PartitionChunk> removedPartition = timeline.remove( + segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(servers) + ); + + if (removedPartition == null) { + log.warn( + "Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", + segment.getInterval(), + segment.getVersion() + ); + } + } + } + } + + public VersionedIntervalTimeline> getTimeline(DataSource dataSource) + { + String table = Iterables.getOnlyElement(dataSource.getNames()); + synchronized (lock) { + return timelines.get(table); + } + } + + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + baseView.registerServerCallback(exec, callback); + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + baseView.registerSegmentCallback(exec, callback); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifier.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifier.java new file mode 100644 index 000000000000..9e0b49ff9853 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifier.java @@ -0,0 +1,104 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord; + +import com.google.common.collect.Maps; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; +import io.druid.indexing.common.actions.SegmentHandoffCheckAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.query.SegmentDescriptor; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TaskActionBasedHandoffNotifier implements SegmentHandoffNotifier +{ + private static final Logger log = new Logger(TaskActionBasedHandoffNotifier.class); + + private final Map> handOffCallbacks = Maps.newConcurrentMap(); + private final TaskActionClient taskActionClient; + private volatile ScheduledExecutorService scheduledExecutor; + private final long pollDurationMillis; + + public TaskActionBasedHandoffNotifier( + TaskActionClient taskActionClient, + TaskActionBasedHandoffNotifierConfig config + ) + { + this.taskActionClient = taskActionClient; + this.pollDurationMillis = config.getPollDuration().getMillis(); + } + + @Override + public void registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + } + + @Override + public void start() + { + scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d"); + scheduledExecutor.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + Iterator>> itr = handOffCallbacks.entrySet() + .iterator(); + while (itr.hasNext()) { + Map.Entry> entry = itr.next(); + try { + if (taskActionClient.submit(new SegmentHandoffCheckAction(entry.getKey()))) { + entry.getValue().lhs.execute(entry.getValue().rhs); + itr.remove(); + } + } + catch (IOException e) { + log.error("Error while checking Segment Handoff", e); + } + } + } + }, 0L, pollDurationMillis, TimeUnit.MILLISECONDS + ); + } + + @Override + public void stop() + { + scheduledExecutor.shutdown(); + } + + // Used in tests + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierConfig.java new file mode 100644 index 000000000000..6f8fe8d1e2c3 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierConfig.java @@ -0,0 +1,35 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +public class TaskActionBasedHandoffNotifierConfig +{ + @JsonProperty + public Duration pollDuration = new Period("PT1M").toStandardDuration(); + + public Duration getPollDuration() + { + return pollDuration; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierFactory.java new file mode 100644 index 000000000000..d995752cac30 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierFactory.java @@ -0,0 +1,42 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord; + +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; + +public class TaskActionBasedHandoffNotifierFactory implements SegmentHandoffNotifierFactory +{ + private final TaskActionClient taskActionClient; + private final TaskActionBasedHandoffNotifierConfig config; + + public TaskActionBasedHandoffNotifierFactory(TaskActionClient taskActionClient, TaskActionBasedHandoffNotifierConfig config) + { + this.taskActionClient = taskActionClient; + this.config = config; + } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new TaskActionBasedHandoffNotifier(taskActionClient, config); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 54fbddc16422..4a9bee069723 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -39,6 +40,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; @@ -66,7 +68,7 @@ public class TaskToolboxTest private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class); private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class); private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class); - private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class); + private SegmentHandoffNotifierFactory mockHandoffNotifierFactory = EasyMock.createMock(SegmentHandoffNotifierFactory.class); private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class); private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); @@ -97,7 +99,6 @@ public void setUp() throws IOException mockDataSegmentMover, mockDataSegmentArchiver, mockSegmentAnnouncer, - mockNewSegmentServerView, mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, @@ -106,7 +107,8 @@ public void setUp() throws IOException mockIndexMerger, mockIndexIO, mockCache, - mockCacheConfig + mockCacheConfig, + new TaskActionBasedHandoffNotifierConfig() ); } @@ -122,12 +124,6 @@ public void testGetSegmentAnnouncer() Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer()); } - @Test - public void testGetNewSegmentServerView() - { - Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView()); - } - @Test public void testGetQueryRunnerFactoryConglomerate() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentHandoffCheckActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentHandoffCheckActionTest.java new file mode 100644 index 000000000000..e06a08f31128 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentHandoffCheckActionTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.OverlordServerView; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.NumberedPartitionChunk; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Set; + +public class SegmentHandoffCheckActionTest +{ + private TaskActionToolbox toolbox; + private OverlordServerView serverView; + private Task task; + private VersionedIntervalTimeline> timeline; + + @Before + public void setup() + { + toolbox = EasyMock.createNiceMock(TaskActionToolbox.class); + serverView = EasyMock.createNiceMock(OverlordServerView.class); + task = EasyMock.createNiceMock(Task.class); + EasyMock.expect(toolbox.getServerView()).andReturn(serverView).anyTimes(); + EasyMock.expect(task.getDataSource()).andReturn("test_ds").anyTimes(); + timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + EasyMock.expect(serverView.getTimeline(new TableDataSource("test_ds"))).andReturn(timeline).anyTimes(); + EasyMock.replay(toolbox, serverView, task); + } + + @After + public void teardown() + { + EasyMock.verify(toolbox, serverView, task); + } + + @Test + public void testSerde() throws IOException + { + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ) + ); + TaskAction serde = jsonMapper.readValue( + jsonMapper.writeValueAsBytes(action), TaskAction.class + ); + Assert.assertTrue(serde instanceof SegmentHandoffCheckAction); + Assert.assertEquals(action, serde); + } + + @Test + public void testEmptyTimeline() throws IOException + { + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ) + ); + Assert.assertFalse(action.perform(task, toolbox)); + } + + @Test + public void testSegmentOnRealtimeNode() throws IOException + { + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ) + ); + timeline.add( + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + NumberedPartitionChunk.make(2, 3, (Set) Sets.newHashSet(createRealtimeServerMetadata("a"))) + ); + Assert.assertFalse(action.perform(task, toolbox)); + } + + @Test + public void testHandoffComplete() throws IOException + { + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ) + ); + timeline.add( + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + NumberedPartitionChunk.make( + 2, + 3, + (Set) Sets.newHashSet( + createRealtimeServerMetadata("a"), + createHistoricalServerMetadata("b") + ) + ) + ); + Assert.assertTrue(action.perform(task, toolbox)); + } + + @Test + public void testOlderVersionSegmentHandedOff() throws IOException + { + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v2", 2 + ) + ); + // historical has older version of segment + timeline.add( + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + NumberedPartitionChunk.make( + 2, + 3, + (Set) Sets.newHashSet( + createRealtimeServerMetadata("a"), + createHistoricalServerMetadata("b") + ) + ) + ); + Assert.assertFalse(action.perform(task, toolbox)); + } + + @Test + public void testHistoricalHasPartialInterval() throws IOException + { + SegmentHandoffCheckAction action = new SegmentHandoffCheckAction( + new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-03" + ), "v1", 2 + ) + ); + // historical segment with partial interval + timeline.add( + new Interval( + "2011-04-01/2011-04-02" + ), + "v1", + NumberedPartitionChunk.make( + 2, + 3, + (Set) Sets.newHashSet( + createRealtimeServerMetadata("a"), + createHistoricalServerMetadata("b") + ) + ) + ); + Assert.assertFalse(action.perform(task, toolbox)); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name) + { + return createServerMetadata(name, "realtime"); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name) + { + return createServerMetadata(name, "historical"); + } + + private DruidServerMetadata createServerMetadata(String name, String type) + { + return new DruidServerMetadata( + name, + name, + 10000, + type, + "tier", + 1 + ); + } + +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index a4d86fd2575e..82ff95b84fa9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -91,7 +91,8 @@ public void before() taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); testDerbyConnector.createPendingSegmentsTable(); testDerbyConnector.createSegmentTable(); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index afa239fed7bf..a65c03f8bb08 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -221,26 +221,19 @@ private final List runTask(final IndexTask indexTask) throws Except indexTask.run( new TaskToolbox( - null, null, new TaskActionClientFactory() + null, null, new TaskActionClient() { @Override - public TaskActionClient create(Task task) + public RetType submit(TaskAction taskAction) throws IOException { - return new TaskActionClient() - { - @Override - public RetType submit(TaskAction taskAction) throws IOException - { - if (taskAction instanceof LockListAction) { - return (RetType) Arrays.asList( - new TaskLock( - "", "", null, new DateTime().toString() - ) - ); - } - return null; - } - }; + if (taskAction instanceof LockListAction) { + return (RetType) Arrays.asList( + new TaskLock( + "", "", null, new DateTime().toString() + ) + ); + } + return null; } }, null, new DataSegmentPusher() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 0c61311d8b58..395c36104881 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; @@ -48,19 +50,20 @@ import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.test.TestDataSegmentAnnouncer; import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; -import io.druid.indexing.test.TestServerView; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; @@ -72,6 +75,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -90,9 +94,10 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.metrics.EventReceiverFirehoseRegister; -import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -107,7 +112,9 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; public class RealtimeIndexTaskTest { @@ -136,6 +143,8 @@ public class RealtimeIndexTaskTest private DateTime now; private ListeningExecutorService taskExec; + private Map> handOffCallbacks; + private SegmentHandoffNotifierFactory handoffNotifierFactory; @Before public void setUp() @@ -204,9 +213,10 @@ public void testBasics() throws Exception Assert.assertEquals(2, countEvents(task)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -294,9 +304,10 @@ public void testRestore() throws Exception Assert.assertEquals(2, countEvents(task2)); // Simulate handoff. - for (DataSegment segment : mdc.getPublished()) { - ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + for(Pair executorRunnablePair : handOffCallbacks.values()){ + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } + handOffCallbacks.clear(); // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); @@ -451,7 +462,8 @@ private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoo final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, mdc, - emitter + emitter, + null ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, @@ -485,6 +497,43 @@ public void registerQuery(Query query, ListenableFuture future) ) ) ); + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() + { + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new SegmentHandoffNotifier() + { + + + @Override + public void registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; + } + }; final TestUtils testUtils = new TestUtils(); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( taskConfig, @@ -495,7 +544,6 @@ public void registerQuery(Query query, ListenableFuture future) null, // DataSegmentMover null, // DataSegmentArchiver new TestDataSegmentAnnouncer(), - new TestServerView(), conglomerate, MoreExecutors.sameThreadExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), @@ -516,8 +564,15 @@ public List getLocations() testUtils.getTestIndexMerger(), testUtils.getTestIndexIO(), MapCache.create(1024), - new CacheConfig() - ); + new CacheConfig(), + new TaskActionBasedHandoffNotifierConfig() + ){ + @Override + protected SegmentHandoffNotifierFactory buildNotifierFactory(TaskActionClient taskActionClient) + { + return handoffNotifierFactory; + } + }; taskLockbox.add(task); return toolboxFactory.build(task); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 6d7b88b69ec1..27d5623a04d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -54,6 +54,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.overlord.HeapMemoryTaskStorage; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.indexing.overlord.TaskLockbox; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.query.aggregation.AggregatorFactory; @@ -188,7 +189,7 @@ public void deleteSegments(Set segments) }; final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory( ts, - new TaskActionToolbox(tl, mdc, newMockEmitter()) + new TaskActionToolbox(tl, mdc, newMockEmitter(), null) ); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( @@ -241,7 +242,6 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException } }, null, // segment announcer - null, // new segment server view null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler @@ -262,7 +262,8 @@ public List getLocations() INDEX_MERGER, INDEX_IO, null, - null + null, + new TaskActionBasedHandoffNotifierConfig() ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index c2ee4a84e02d..dc3e5fb92627 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -50,6 +50,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.NoopDimFilter; @@ -309,7 +310,6 @@ public TaskActionClient create(Task task) null, // segment mover null, // segment archiver null, // segment announcer - null, // new segment server view null, // query runner factory conglomerate corporation unionized collective null, // query executor service null, // monitor scheduler @@ -330,7 +330,8 @@ public List getLocations() INDEX_MERGER, INDEX_IO, null, - null + null, + new TaskActionBasedHandoffNotifierConfig() ); final Injector injector = Guice.createInjector( new Module() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordServerViewTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordServerViewTest.java new file mode 100644 index 000000000000..a6c4a5489c1e --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordServerViewTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import io.druid.client.BatchServerInventoryView; +import io.druid.client.DruidServer; +import io.druid.client.ServerInventoryView; +import io.druid.curator.CuratorTestBase; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.PartitionHolder; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +public class OverlordServerViewTest extends CuratorTestBase +{ + private final ObjectMapper jsonMapper; + private final ZkPathsConfig zkPathsConfig; + private final String announcementsPath; + private final String inventoryPath; + + private CountDownLatch segmentViewInitLatch; + private CountDownLatch segmentAddedLatch; + private CountDownLatch segmentRemovedLatch; + + private ServerInventoryView baseView; + private OverlordServerView overlordServerView; + + public OverlordServerViewTest() + { + jsonMapper = new DefaultObjectMapper(); + zkPathsConfig = new ZkPathsConfig(); + announcementsPath = zkPathsConfig.getAnnouncementsPath(); + inventoryPath = zkPathsConfig.getLiveSegmentsPath(); + } + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + curator.start(); + } + + @Test + public void testSingleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(1); + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final DruidServer druidServer = new DruidServer( + "localhost:1234", + "localhost:1234", + 10000000L, + "historical", + "default_tier", + 0 + ); + + setupZNodeForServer(druidServer); + + final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); + announceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + List serverLookupRes = (List) timeline.lookup( + new Interval( + "2014-10-20T00:00:00Z/P1D" + ) + ); + Assert.assertEquals(1, serverLookupRes.size()); + + TimelineObjectHolder> actualTimelineObjectHolder = serverLookupRes.get(0); + Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); + Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); + + PartitionHolder> actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + Set servers = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(servers.isEmpty()); + Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(servers)); + + unannounceSegmentForServer(druidServer, segment); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size() + ); + Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1")); + } + + @Test + public void testMultipleServerAddedRemovedSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(5); + + // temporarily set latch count to 1 + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final List druidServers = Lists.transform( + ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + new Function() + { + @Override + public DruidServer apply(String input) + { + return new DruidServer( + input, + input, + 10000000L, + "historical", + "default_tier", + 0 + ); + } + } + ); + + for (DruidServer druidServer : druidServers) { + setupZNodeForServer(druidServer); + } + + final List segments = Lists.transform( + ImmutableList.>of( + Pair.of("2011-04-01/2011-04-03", "v1"), + Pair.of("2011-04-03/2011-04-06", "v1"), + Pair.of("2011-04-01/2011-04-09", "v2"), + Pair.of("2011-04-06/2011-04-09", "v3"), + Pair.of("2011-04-01/2011-04-02", "v3") + ), new Function, DataSegment>() + { + @Override + public DataSegment apply(Pair input) + { + return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs); + } + } + ); + + for (int i = 0; i < 5; ++i) { + announceSegmentForServer(druidServers.get(i), segments.get(i)); + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") + unannounceSegmentForServer(druidServers.get(2), segments.get(2)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + // renew segmentRemovedLatch since we still have 4 segments to unannounce + segmentRemovedLatch = new CountDownLatch(4); + + timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)), + createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + new Interval( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce all the segments + for (int i = 0; i < 5; ++i) { + // skip the one that was previously unannounced + if (i != 2) { + unannounceSegmentForServer(druidServers.get(i), segments.get(i)); + } + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + Assert.assertEquals( + 0, + ((List) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size() + ); + } + + private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath( + ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()), + jsonMapper.writeValueAsBytes( + ImmutableSet.of(segment) + ) + ); + } + + private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception + { + curator.delete().guaranteed().forPath( + ZKPaths.makePath( + ZKPaths.makePath(inventoryPath, druidServer.getHost()), + segment.getIdentifier() + ) + ); + } + + private Pair>> createExpected( + String intervalStr, + String version, + DruidServer druidServer, + DataSegment segment + ) + { + return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment))); + } + + private void assertValues( + List>>> expected, List actual + ) + { + Assert.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < expected.size(); ++i) { + Pair>> expectedPair = expected.get(i); + TimelineObjectHolder> actualTimelineObjectHolder = actual.get(i); + + Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); + Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); + + PartitionHolder> actualPartitionHolder = actualTimelineObjectHolder.getObject(); + Assert.assertTrue(actualPartitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); + + Set servers = actualPartitionHolder.iterator().next().getObject(); + Assert.assertFalse(servers.isEmpty()); + Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(servers)); + } + } + + private void setupViews() throws Exception + { + baseView = new BatchServerInventoryView( + zkPathsConfig, + curator, + jsonMapper, + Predicates.alwaysTrue() + ) + { + @Override + public void registerSegmentCallback(Executor exec, final SegmentCallback callback) + { + super.registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentAdded(server, segment); + segmentAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentRemoved(server, segment); + segmentRemovedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentViewInitialized() + { + CallbackAction res = callback.segmentViewInitialized(); + segmentViewInitLatch.countDown(); + return res; + } + } + ); + } + }; + + overlordServerView = new OverlordServerView( + baseView + ); + + baseView.start(); + } + + private void setupZNodeForServer(DruidServer server) throws Exception + { + curator.create() + .creatingParentsIfNeeded() + .forPath( + ZKPaths.makePath(announcementsPath, server.getHost()), + jsonMapper.writeValueAsBytes(server.getMetadata()) + ); + curator.create() + .creatingParentsIfNeeded() + .forPath(ZKPaths.makePath(inventoryPath, server.getHost())); + } + + private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) + { + return DataSegment.builder() + .dataSource("test_overlord_server_view") + .interval(new Interval(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(version) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NoneShardSpec()) + .binaryVersion(9) + .size(0) + .build(); + } + + @After + public void tearDown() throws Exception + { + baseView.stop(); + tearDownServerAndCurator(); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierTest.java new file mode 100644 index 000000000000..d59eff24d181 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskActionBasedHandoffNotifierTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentHandoffCheckAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.query.SegmentDescriptor; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TaskActionBasedHandoffNotifierTest +{ + + TaskActionBasedHandoffNotifierConfig notifierConfig = new TaskActionBasedHandoffNotifierConfig() + { + @Override + public Duration getPollDuration() + { + return Duration.millis(10); + } + }; + + @Test + public void testHandoffCallbackNotCalled() throws IOException, InterruptedException + { + SegmentDescriptor descriptor = new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ); + + TaskActionClient client = EasyMock.createMock(TaskActionClient.class); + EasyMock.expect(client.submit(new SegmentHandoffCheckAction(descriptor))).andReturn(false).anyTimes(); + EasyMock.replay(client); + TaskActionBasedHandoffNotifier notifier = new TaskActionBasedHandoffNotifier(client, notifierConfig); + notifier.start(); + final CountDownLatch handOffLatch = new CountDownLatch(1); + + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + handOffLatch.countDown(); + } + } + ); + + // callback should have registered + Assert.assertEquals(1, notifier.getHandOffCallbacks().size()); + Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor)); + Assert.assertFalse(handOffLatch.await(100, TimeUnit.MILLISECONDS)); + notifier.stop(); + EasyMock.verify(client); + } + + @Test + public void testHandoffCallbackCalled() throws IOException, InterruptedException + { + SegmentDescriptor descriptor = new SegmentDescriptor( + new Interval( + "2011-04-01/2011-04-02" + ), "v1", 2 + ); + + TaskActionClient client = EasyMock.createMock(TaskActionClient.class); + EasyMock.expect(client.submit(new SegmentHandoffCheckAction(descriptor))).andReturn(true).anyTimes(); + EasyMock.replay(client); + TaskActionBasedHandoffNotifier notifier = new TaskActionBasedHandoffNotifier( + client, + notifierConfig + ); + notifier.start(); + final CountDownLatch handOffLatch = new CountDownLatch(1); + + notifier.registerSegmentHandoffCallback( + descriptor, MoreExecutors.sameThreadExecutor(), new Runnable() + { + @Override + public void run() + { + handOffLatch.countDown(); + } + } + ); + + // callback should have been called. + Assert.assertTrue(handOffLatch.await(400, TimeUnit.MILLISECONDS)); + Assert.assertEquals(0, notifier.getHandOffCallbacks().size()); + notifier.stop(); + EasyMock.verify(client); + } + +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index c8c4d7f26fd2..d16b655ade12 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -24,16 +24,17 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Event; @@ -41,8 +42,6 @@ import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -59,6 +58,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; @@ -67,7 +67,6 @@ import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; -import io.druid.indexing.common.task.RealtimeIndexTaskTest; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; @@ -76,6 +75,7 @@ import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -96,8 +96,9 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentTest; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -116,7 +117,6 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -195,7 +195,6 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private TaskToolboxFactory tb = null; private IndexSpec indexSpec; private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; - private FilteredServerView serverView; private MonitorScheduler monitorScheduler; private ServiceEmitter emitter; private TaskQueueConfig tqc; @@ -203,7 +202,9 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private int announcedSinks; private static CountDownLatch publishCountDown; private TestDerbyConnector testDerbyConnector; - private List segmentCallbacks = new ArrayList<>(); + private SegmentHandoffNotifierFactory handoffNotifierFactory; + private Map> handOffCallbacks; + private static TestIndexerMetadataStorageCoordinator newMockMDC() { @@ -391,15 +392,41 @@ public void setUp() throws Exception } else { throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); } - - serverView = new FilteredServerView() + handOffCallbacks = Maps.newConcurrentMap(); + handoffNotifierFactory = new SegmentHandoffNotifierFactory() { @Override - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ) + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { - segmentCallbacks.add(callback); + return new SegmentHandoffNotifier() + { + + + @Override + public void registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + } + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; } }; setUpAndStartTaskQueue( @@ -427,7 +454,7 @@ private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); - tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); + tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter(), null)); tb = new TaskToolboxFactory( taskConfig, tac, @@ -483,7 +510,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } }, // segment announcer - serverView, // new segment server view queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective null, // query executor service monitorScheduler, // monitor scheduler @@ -504,8 +530,15 @@ public List getLocations() INDEX_MERGER, INDEX_IO, MapCache.create(0), - FireDepartmentTest.NO_CACHE_CONFIG - ); + FireDepartmentTest.NO_CACHE_CONFIG, + new TaskActionBasedHandoffNotifierConfig() + ){ + @Override + protected SegmentHandoffNotifierFactory buildNotifierFactory(TaskActionClient taskActionClient) + { + return handoffNotifierFactory; + } + }; tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq.start(); @@ -853,16 +886,10 @@ public void testRealtimeIndexTask() throws Exception Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS)); // Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status - segmentCallbacks.get(0).segmentAdded( - new DruidServerMetadata( - "dummy", - "dummy_host", - 0, - "historical", - "dummy_tier", - 0 - ), mdc.getPublished().iterator().next() - ); + Assert.assertEquals(1, handOffCallbacks.size()); + Pair executorRunnablePair = Iterables.getOnlyElement(handOffCallbacks.values()); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + handOffCallbacks.clear(); // Wait for realtime index task to handle callback in plumber and succeed while (tsqa.getStatus(taskId).get().isRunnable()) { diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 944b3d5625cb..b9bea9503020 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -30,7 +30,11 @@ import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; @@ -46,6 +50,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; +import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -135,6 +140,10 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); + TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); + EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes(); + EasyMock.replay(taskActionClientFactory, taskActionClient); return new WorkerTaskMonitor( jsonMapper, cf, @@ -142,7 +151,8 @@ private WorkerTaskMonitor createTaskMonitor() new ThreadPoolTaskRunner( new TaskToolboxFactory( taskConfig, - null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + taskActionClientFactory, + null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() @@ -160,7 +170,8 @@ public List getLocations() indexMerger, indexIO, null, - null + null, + new TaskActionBasedHandoffNotifierConfig() ), taskConfig, new NoopServiceEmitter() diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index b4dcf11620a3..d1cfbf009667 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -42,8 +41,6 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; @@ -78,7 +75,6 @@ import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -119,7 +115,7 @@ public class RealtimePlumber implements Plumber private final ExecutorService queryExecutorService; private final DataSegmentPusher dataSegmentPusher; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifier handoffNotifier; private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( @@ -155,7 +151,7 @@ public RealtimePlumber( ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - FilteredServerView serverView, + SegmentHandoffNotifier handoffNotifier, IndexMerger indexMerger, IndexIO indexIO, Cache cache, @@ -173,14 +169,14 @@ public RealtimePlumber( this.queryExecutorService = queryExecutorService; this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifier = handoffNotifier; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; this.objectMapper = objectMapper; - if(!cache.isLocal()) { + if (!cache.isLocal()) { log.error("Configured cache is not local, caching will not be enabled"); } @@ -212,8 +208,8 @@ public Object startJob() { computeBaseDir(schema).mkdirs(); initializeExecutors(); + handoffNotifier.start(); Object retVal = bootstrapSinksFromDisk(); - registerServerViewCallback(); startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); @@ -258,17 +254,8 @@ private Sink getSink(long timestamp) ); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); + addSink(retVal); - try { - segmentAnnouncer.announceSegment(retVal.getSegment()); - sinks.put(truncatedTime, retVal); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); - } - catch (IOException e) { - log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) - .addData("interval", retVal.getInterval()) - .emit(); - } } return retVal; @@ -556,7 +543,7 @@ public void doRun() mergedTarget, config.getIndexSpec() ); - + // emit merge metrics before publishing segment metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); @@ -640,6 +627,7 @@ public String apply(Sink input) } } + handoffNotifier.stop(); shutdownExecutors(); stopped = true; @@ -678,11 +666,11 @@ protected void initializeExecutors() protected void shutdownExecutors() { - // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the - // ServerView sends it a new segment callback + // scheduledExecutor is shutdown here if (scheduledExecutor != null) { scheduledExecutor.shutdown(); persistExecutor.shutdown(); + mergeExecutor.shutdown(); } } @@ -703,7 +691,7 @@ protected Object bootstrapSinksFromDisk() Object metadata = null; long latestCommitTime = 0; for (File sinkDir : files) { - Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); + final Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); // To avoid reading and listing of "merged" dir @@ -735,97 +723,112 @@ public int compare(File o1, File o2) } ); boolean isCorrupted = false; - try { - List hydrants = Lists.newArrayList(); - for (File segmentDir : sinkFiles) { - log.info("Loading previously persisted segment at [%s]", segmentDir); - - // Although this has been tackled at start of this method. - // Just a doubly-check added to skip "merged" dir. from being added to hydrants - // If 100% sure that this is not needed, this check can be removed. - if (Ints.tryParse(segmentDir.getName()) == null) { - continue; - } - QueryableIndex queryableIndex = null; + List hydrants = Lists.newArrayList(); + for (File segmentDir : sinkFiles) { + log.info("Loading previously persisted segment at [%s]", segmentDir); + + // Although this has been tackled at start of this method. + // Just a doubly-check added to skip "merged" dir. from being added to hydrants + // If 100% sure that this is not needed, this check can be removed. + if (Ints.tryParse(segmentDir.getName()) == null) { + continue; + } + QueryableIndex queryableIndex = null; + try { + queryableIndex = indexIO.loadIndex(segmentDir); + } + catch (IOException e) { + log.error(e, "Problem loading segmentDir from disk."); + isCorrupted = true; + } + if (isCorrupted) { try { - queryableIndex = indexIO.loadIndex(segmentDir); + File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); + log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); + FileUtils.copyDirectory(segmentDir, corruptSegmentDir); + FileUtils.deleteDirectory(segmentDir); } - catch (IOException e) { - log.error(e, "Problem loading segmentDir from disk."); - isCorrupted = true; + catch (Exception e1) { + log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); } - if (isCorrupted) { - try { - File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); - log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); - FileUtils.copyDirectory(segmentDir, corruptSegmentDir); - FileUtils.deleteDirectory(segmentDir); - } - catch (Exception e1) { - log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); - } - //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed - //at some point. - continue; - } - Map segmentMetadata = queryableIndex.getMetaData(); - if (segmentMetadata != null) { - Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); - if (timestampObj != null) { - long timestamp = ((Long) timestampObj).longValue(); - if (timestamp > latestCommitTime) { - log.info( - "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", - queryableIndex.getMetaData(), timestamp, latestCommitTime - ); - latestCommitTime = timestamp; - metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); - } + //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed + //at some point. + continue; + } + Map segmentMetadata = queryableIndex.getMetaData(); + if (segmentMetadata != null) { + Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); + if (timestampObj != null) { + long timestamp = ((Long) timestampObj).longValue(); + if (timestamp > latestCommitTime) { + log.info( + "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", + queryableIndex.getMetaData(), timestamp, latestCommitTime + ); + latestCommitTime = timestamp; + metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); } } - hydrants.add( - new FireHydrant( - new QueryableIndexSegment( - DataSegment.makeDataSegmentIdentifier( - schema.getDataSource(), - sinkInterval.getStart(), - sinkInterval.getEnd(), - versioningPolicy.getVersion(sinkInterval), - config.getShardSpec() - ), - queryableIndex - ), - Integer.parseInt(segmentDir.getName()) - ) - ); - } - if (hydrants.isEmpty()) { - // Probably encountered a corrupt sink directory - log.warn( - "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", - sinkDir.getAbsolutePath() - ); - continue; } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); - sinks.put(sinkInterval.getStartMillis(), currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - new SingleElementPartitionChunk(currSink) + hydrants.add( + new FireHydrant( + new QueryableIndexSegment( + DataSegment.makeDataSegmentIdentifier( + schema.getDataSource(), + sinkInterval.getStart(), + sinkInterval.getEnd(), + versioningPolicy.getVersion(sinkInterval), + config.getShardSpec() + ), + queryableIndex + ), + Integer.parseInt(segmentDir.getName()) + ) ); - - segmentAnnouncer.announceSegment(currSink.getSegment()); } - catch (IOException e) { - log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) - .addData("interval", sinkInterval) - .emit(); + if (hydrants.isEmpty()) { + // Probably encountered a corrupt sink directory + log.warn( + "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", + sinkDir.getAbsolutePath() + ); + continue; } + final Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); + addSink(currSink); } return metadata; } + private void addSink(final Sink sink) + { + sinks.put(sink.getInterval().getStartMillis(), sink); + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()), + mergeExecutor, new Runnable() + { + @Override + public void run() + { + abandonSegment(sink.getInterval().getStartMillis(), sink); + } + } + ); + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk(sink) + ); + try { + segmentAnnouncer.announceSegment(sink.getSegment()); + } + catch (IOException e) { + log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + protected void startPersistThread() { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); @@ -1044,72 +1047,6 @@ protected int persistHydrant( } } - private void registerServerViewCallback() - { - serverView.registerSegmentCallback( - mergeExecutor, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) - { - if (stopped) { - log.info("Unregistering ServerViewCallback"); - mergeExecutor.shutdown(); - return ServerView.CallbackAction.UNREGISTER; - } - - if (!server.isAssignable()) { - return ServerView.CallbackAction.CONTINUE; - } - - log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - ) { - final Interval interval = segment.getInterval(); - for (Map.Entry entry : sinks.entrySet()) { - final Long sinkKey = entry.getKey(); - if (interval.contains(sinkKey)) { - final Sink sink = entry.getValue(); - log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server); - - final String segmentVersion = segment.getVersion(); - final String sinkVersion = sink.getSegment().getVersion(); - if (segmentVersion.compareTo(sinkVersion) >= 0) { - log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - abandonSegment(sinkKey, sink); - } - } - } - } - - return ServerView.CallbackAction.CONTINUE; - } - }, - new Predicate() - { - @Override - public boolean apply(final DataSegment segment) - { - return - schema.getDataSource().equalsIgnoreCase(segment.getDataSource()) - && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() - && Iterables.any( - sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } - ); - } - } - ); - } - private void removeSegment(final Sink sink, final File target) { if (target.exists()) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 07eea0dbe1c0..78edb3f39927 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -47,7 +47,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final DataSegmentPusher dataSegmentPusher; private final DataSegmentAnnouncer segmentAnnouncer; private final SegmentPublisher segmentPublisher; - private final FilteredServerView serverView; + private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; private final IndexIO indexIO; @@ -62,7 +62,7 @@ public RealtimePlumberSchool( @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject SegmentPublisher segmentPublisher, - @JacksonInject FilteredServerView serverView, + @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, @JacksonInject @Processing ExecutorService executorService, @JacksonInject IndexMerger indexMerger, @JacksonInject IndexIO indexIO, @@ -76,7 +76,7 @@ public RealtimePlumberSchool( this.dataSegmentPusher = dataSegmentPusher; this.segmentAnnouncer = segmentAnnouncer; this.segmentPublisher = segmentPublisher; - this.serverView = serverView; + this.handoffNotifierFactory = handoffNotifierFactory; this.queryExecutorService = executorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -105,7 +105,7 @@ public Plumber findPlumber( queryExecutorService, dataSegmentPusher, segmentPublisher, - serverView, + handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), indexMerger, indexIO, cache, @@ -120,7 +120,7 @@ private void verifyState() Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action."); Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action."); Preconditions.checkNotNull(segmentPublisher, "must specify a segmentPublisher to do this action."); - Preconditions.checkNotNull(serverView, "must specify a serverView to do this action."); + Preconditions.checkNotNull(handoffNotifierFactory, "must specify a handoffNotifierFactory to do this action."); Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java new file mode 100644 index 000000000000..2e00230e67fa --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifier.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import io.druid.query.SegmentDescriptor; + +import java.util.concurrent.Executor; + +public interface SegmentHandoffNotifier +{ + /** + * register a handOffCallback to be called when segment handoff is complete. + * + * @param descriptor segment descriptor for the segment for which handoffCallback is requested + * @param exec executor used to call the runnable + * @param handOffRunnable runnable to be called when segment handoff is complete + */ + void registerSegmentHandoffCallback(SegmentDescriptor descriptor, + Executor exec, + Runnable handOffRunnable + ); + /** + * Perform any initial setup. Should be called before using any other methods, and should be paired + * with a corresponding call to {@link #stop()}. + */ + void start(); + + /** + * Perform any final processing and clean up after ourselves. + */ + void stop(); + +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..454487770ed0 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SegmentHandoffNotifierFactory.java @@ -0,0 +1,26 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + + +public interface SegmentHandoffNotifierFactory +{ + SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource); +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifier.java new file mode 100644 index 000000000000..860b1a3d7468 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifier.java @@ -0,0 +1,125 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import io.druid.client.FilteredServerView; +import io.druid.client.ServerView; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Executor; + +public class ServerViewSegmentHandoffNotifier implements SegmentHandoffNotifier +{ + private static final Logger log = new Logger(ServerViewSegmentHandoffNotifier.class); + + private final FilteredServerView serverView; + private final String dataSource; + private final Map> handOffCallbacks = Maps.newConcurrentMap(); + + + public ServerViewSegmentHandoffNotifier( + FilteredServerView serverView, + String dataSource + ) + { + this.serverView = serverView; + this.dataSource = dataSource; + + } + + + @Override + public void registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + } + + @Override + public void start() + { + serverView.registerSegmentCallback( + MoreExecutors.sameThreadExecutor(), new ServerView.BaseSegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded( + DruidServerMetadata server, DataSegment segment + ) + { + if (dataSource.equals(segment.getDataSource())) { + Iterator>> itr = handOffCallbacks.entrySet() + .iterator(); + while (itr.hasNext()) { + Map.Entry> entry = itr.next(); + if (segment.getInterval().contains(entry.getKey().getInterval())) { + log.info("Segment[%s] matches sink[%s] on server[%s]", segment, entry.getKey(), server); + + final String segmentVersion = segment.getVersion(); + if (segmentVersion.compareTo(entry.getKey().getVersion()) >= 0) { + log.info("Segment version[%s] >= sink version[%s]", segmentVersion, entry.getKey().getVersion()); + entry.getValue().lhs.execute(entry.getValue().rhs); + itr.remove(); + } + } + } + } + return ServerView.CallbackAction.CONTINUE; + } + }, + new Predicate() + { + @Override + public boolean apply(final DataSegment segment) + { + return + dataSource.equals(segment.getDataSource()) + && Iterables.any( + handOffCallbacks.keySet(), new Predicate() + { + @Override + public boolean apply(SegmentDescriptor sinkKey) + { + return segment.getShardSpec().getPartitionNum() == sinkKey.getPartitionNumber() + && segment.getInterval().contains(sinkKey.getInterval()); + } + } + ); + } + } + ); + } + + @Override + public void stop() + { + // Nothing to cleanup + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifierFactory.java new file mode 100644 index 000000000000..9cb20d3c2d29 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/ServerViewSegmentHandoffNotifierFactory.java @@ -0,0 +1,40 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.plumber; + +import com.google.inject.Inject; +import io.druid.client.FilteredServerView; + +public class ServerViewSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory +{ + private final FilteredServerView serverView; + + @Inject + public ServerViewSegmentHandoffNotifierFactory(FilteredServerView serverView) + { + this.serverView = serverView; + } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new ServerViewSegmentHandoffNotifier(serverView, dataSource); + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 47bf49c10dbe..e0fc6c9d3a9c 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -43,6 +43,7 @@ import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.TestHelper; @@ -89,7 +90,8 @@ public class RealtimePlumberSchoolTest private DataSegmentAnnouncer announcer; private SegmentPublisher segmentPublisher; private DataSegmentPusher dataSegmentPusher; - private FilteredServerView serverView; + private SegmentHandoffNotifierFactory handoffNotifierFactory; + private SegmentHandoffNotifier handoffNotifier; private ServiceEmitter emitter; private RealtimeTuningConfig tuningConfig; private DataSchema schema; @@ -162,17 +164,18 @@ public void setUp() throws Exception segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class); dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class); - serverView = EasyMock.createMock(FilteredServerView.class); - serverView.registerSegmentCallback( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.>anyObject() + handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); + handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class); + EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes(); + handoffNotifier.registerSegmentHandoffCallback(EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() ); EasyMock.expectLastCall().anyTimes(); emitter = EasyMock.createMock(ServiceEmitter.class); - EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); tuningConfig = new RealtimeTuningConfig( 1, @@ -192,7 +195,7 @@ public void setUp() throws Exception dataSegmentPusher, announcer, segmentPublisher, - serverView, + handoffNotifierFactory, MoreExecutors.sameThreadExecutor(), TestHelper.getTestIndexMerger(), TestHelper.getTestIndexIO(), @@ -208,7 +211,7 @@ public void setUp() throws Exception @After public void tearDown() throws Exception { - EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, emitter); FileUtils.deleteDirectory( new File( tuningConfig.getBasePersistDirectory(), diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 0785ed19e07d..c7d87138ffdc 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -52,6 +52,7 @@ import io.druid.indexing.overlord.ForkingTaskRunnerFactory; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.MetadataTaskStorage; +import io.druid.indexing.overlord.OverlordServerView; import io.druid.indexing.overlord.RemoteTaskRunnerFactory; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskMaster; @@ -148,6 +149,7 @@ public void configure(Binder binder) configureTaskStorage(binder); configureRunners(binder); configureAutoscale(binder); + binder.bind(OverlordServerView.class).in(LazySingleton.class); binder.bind(AuditManager.class) .toProvider(AuditManagerProvider.class) diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 5a2b3bbf9ff9..a160b8728b8d 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -52,6 +52,7 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -162,6 +163,12 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule()); + JsonConfigProvider.bind( + binder, + "druid.indexer.task.handoff", + TaskActionBasedHandoffNotifierConfig.class + ); + // Override the default SegmentLoaderConfig because we don't actually care about the // configuration based locations. This will override them anyway. This is also stopping // configuration of other parameters, but I don't think that's actually a problem. diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index c0753fe01fa2..f87d305b467e 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -34,6 +34,8 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.ServerViewSegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -77,10 +79,17 @@ public void configure(Binder binder) .to(NoopChatHandlerProvider.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider(FireDepartmentsProvider.class) .in(LazySingleton.class); + binder.bind(SegmentHandoffNotifierFactory.class) + .to(ServerViewSegmentHandoffNotifierFactory.class) + .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); binder.install(new CacheModule());