Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private void remove(
}
}

private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval, boolean incompleteOk)
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval, boolean incompleteOk)
{
List<TimelineObjectHolder<VersionType, ObjectType>> retVal = new ArrayList<TimelineObjectHolder<VersionType, ObjectType>>();
NavigableMap<Interval, TimelineEntry> timeline = (incompleteOk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand All @@ -61,14 +62,14 @@ public class TaskToolbox
{
private final TaskConfig config;
private final Task task;
private final TaskActionClientFactory taskActionClientFactory;
private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
Expand All @@ -84,14 +85,14 @@ public class TaskToolbox
public TaskToolbox(
TaskConfig config,
Task task,
TaskActionClientFactory taskActionClientFactory,
TaskActionClient taskActionClient,
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
FilteredServerView newSegmentServerView,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
Expand All @@ -106,14 +107,14 @@ public TaskToolbox(
{
this.config = config;
this.task = task;
this.taskActionClientFactory = taskActionClientFactory;
this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
Expand All @@ -133,7 +134,7 @@ public TaskConfig getConfig()

public TaskActionClient getTaskActionClient()
{
return taskActionClientFactory.create(task);
return taskActionClient;
}

public ServiceEmitter getEmitter()
Expand Down Expand Up @@ -166,9 +167,9 @@ public DataSegmentAnnouncer getSegmentAnnouncer()
return segmentAnnouncer;
}

public FilteredServerView getNewSegmentServerView()
public SegmentHandoffNotifierFactory getHandoffNotifierFactory()
{
return newSegmentServerView;
return handoffNotifierFactory;
}

public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierConfig;
import io.druid.indexing.overlord.TaskActionBasedHandoffNotifierFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;

import java.io.File;
Expand All @@ -54,7 +58,7 @@ public class TaskToolboxFactory
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final TaskActionBasedHandoffNotifierConfig notifierConfig;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
Expand All @@ -75,7 +79,6 @@ public TaskToolboxFactory(
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
FilteredServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
Expand All @@ -84,7 +87,8 @@ public TaskToolboxFactory(
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
TaskActionBasedHandoffNotifierConfig notifierConfig
)
{
this.config = config;
Expand All @@ -95,7 +99,6 @@ public TaskToolboxFactory(
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
Expand All @@ -105,23 +108,24 @@ public TaskToolboxFactory(
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.notifierConfig = notifierConfig;
}

public TaskToolbox build(Task task)
{
final File taskWorkDir = config.getTaskWorkDir(task.getId());

TaskActionClient taskActionClient = taskActionClientFactory.create(task);
return new TaskToolbox(
config,
task,
taskActionClientFactory,
taskActionClient,
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
buildNotifierFactory(taskActionClient),
queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler,
Expand All @@ -134,4 +138,9 @@ public TaskToolbox build(Task task)
cacheConfig
);
}

// Used in tests
protected SegmentHandoffNotifierFactory buildNotifierFactory(TaskActionClient taskActionClient){
return new TaskActionBasedHandoffNotifierFactory(taskActionClient, notifierConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.OverlordServerView;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;

import java.io.IOException;
import java.util.List;
import java.util.Set;


public class SegmentHandoffCheckAction implements TaskAction<Boolean>
{
private static final Logger log = new Logger(SegmentHandoffCheckAction.class);
@JsonIgnore
private final SegmentDescriptor segmentDescriptor;

@JsonCreator
public SegmentHandoffCheckAction(@JsonProperty("segmentDescriptor") SegmentDescriptor segmentDescriptor)
{
Preconditions.checkNotNull(segmentDescriptor, "segmentDescriptor");
this.segmentDescriptor = segmentDescriptor;
}

@JsonProperty("segmentDescriptor")
public SegmentDescriptor getSegmentDescriptor()
{
return segmentDescriptor;
}

@Override
public TypeReference<Boolean> getReturnTypeReference()
{
return new TypeReference<Boolean>()
{
};
}

@Override
public Boolean perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
String dataSource = task.getDataSource();
OverlordServerView serverView = toolbox.getServerView();
VersionedIntervalTimeline<String, Set<DruidServerMetadata>> timeline = serverView.getTimeline(
new TableDataSource(
dataSource
)
);
if (timeline == null) {
log.debug("No timeline found for datasource[%s]", dataSource);
return false;
}

List<TimelineObjectHolder<String, Set<DruidServerMetadata>>> lookup = timeline.lookup(
segmentDescriptor.getInterval(),
true
);

for (TimelineObjectHolder<String, Set<DruidServerMetadata>> timelineObjectHolder : lookup) {
if (timelineObjectHolder.getInterval().contains(segmentDescriptor.getInterval()) &&
timelineObjectHolder.getVersion().compareTo(segmentDescriptor.getVersion()) >= 0 &&
timelineObjectHolder.getObject().getChunk(segmentDescriptor.getPartitionNumber()) != null &&
Iterables.any(
timelineObjectHolder.getObject().getChunk(segmentDescriptor.getPartitionNumber()).getObject(),
new Predicate<DruidServerMetadata>()
{
@Override
public boolean apply(DruidServerMetadata input)
{
return input.isAssignable();
}
}
)) {

return true;
}
}
return false;
}

@Override
public boolean isAudited()
{
return true;
}

@Override
public String toString()
{
return "SegmentHandoffCheckAction{" +
"segmentDescriptor=" + segmentDescriptor +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SegmentHandoffCheckAction that = (SegmentHandoffCheckAction) o;

return segmentDescriptor.equals(that.segmentDescriptor);

}

@Override
public int hashCode()
{
return segmentDescriptor.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class),
@JsonSubTypes.Type(name = "segmentHandoffCheck", value = SegmentHandoffCheckAction.class)
})
public interface TaskAction<RetType>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package io.druid.indexing.common.actions;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
Expand All @@ -26,6 +25,7 @@
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.OverlordServerView;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;

Expand All @@ -37,17 +37,20 @@ public class TaskActionToolbox
private final TaskLockbox taskLockbox;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
private final OverlordServerView serverView;

@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter
ServiceEmitter emitter,
OverlordServerView serverView
)
{
this.taskLockbox = taskLockbox;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
this.serverView = serverView;
}

public TaskLockbox getTaskLockbox()
Expand Down Expand Up @@ -108,4 +111,9 @@ public boolean apply(TaskLock taskLock)

return true;
}

public OverlordServerView getServerView()
{
return serverView;
}
}
Loading