Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/java/io/druid/timeline/TimelineLookup.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public interface TimelineLookup<VersionType, ObjectType>
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);

/**
* Does a lookup for the objects representing the given time interval. Will also return
* incomplete PartitionHolders.
*
* @param interval interval to find objects for
*
* @return Holders representing the interval that the objects exist for, PartitionHolders
* can be incomplete. Holders returned sorted by the interval.
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);

public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inter
}
}

@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval)
{
try {
lock.readLock().lock();
return lookup(interval, true);
}
finally {
lock.readLock().unlock();
}
}

public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
{
try {
Expand Down
10 changes: 10 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ This config is used to find the [Indexing Service](../design/indexing-service.ht
|--------|-----------|-------|
|`druid.selectors.indexing.serviceName`|The druid.service name of the indexing service Overlord node. To start the Overlord with a different name, set it with this property. |druid/overlord|


### Coordinator Discovery

This config is used to find the [Coordinator](../design/coordinator.html) using Curator service discovery. This config is used by the realtime indexing nodes to get information about the segments loaded in the cluster.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.selectors.coordinator.serviceName`|The druid.service name of the coordinator node. To start the Coordinator with a different name, set it with this property. |druid/coordinator|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this is "druid/coordinator" by default, it should be set to "coordinator" in the example common.runtime.properties, since the example coordinator properties set the druid.service to "coordinator". Many people start off their clusters by copying the example configs.

I wonder if we should also just change the defaults in the code to match those…

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the configs were created before we had defaults. I'd rather just change the example configs to match the defaults, or leave it out of the example configs entirely, since it's not necessary.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default overlord service name in tranquility is "overlord" to match the examples, so if we do change the examples we should change that too. All three of those things should match though (druid defaults, tranquility defaults, druid example configs)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2046 to track

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



### Announcing Segments

You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.
Expand Down
2 changes: 2 additions & 0 deletions docs/content/configuration/production-cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ druid.cache.readBufferSize=10485760
# Indexing Service Service Discovery
druid.selectors.indexing.serviceName=druid:prod:overlord

# Coordinator Service Discovery
druid.selectors.coordinator.serviceName=druid:prod:coordinator
```

### Overlord Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
Expand All @@ -45,6 +43,7 @@
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand All @@ -63,14 +62,14 @@ public class TaskToolbox
{
private final TaskConfig config;
private final Task task;
private final TaskActionClientFactory taskActionClientFactory;
private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
Expand All @@ -86,14 +85,14 @@ public class TaskToolbox
public TaskToolbox(
TaskConfig config,
Task task,
TaskActionClientFactory taskActionClientFactory,
TaskActionClient taskActionClient,
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
FilteredServerView newSegmentServerView,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
Expand All @@ -108,14 +107,14 @@ public TaskToolbox(
{
this.config = config;
this.task = task;
this.taskActionClientFactory = taskActionClientFactory;
this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
Expand All @@ -135,7 +134,7 @@ public TaskConfig getConfig()

public TaskActionClient getTaskActionClient()
{
return taskActionClientFactory.create(task);
return taskActionClient;
}

public ServiceEmitter getEmitter()
Expand Down Expand Up @@ -168,9 +167,9 @@ public DataSegmentAnnouncer getSegmentAnnouncer()
return segmentAnnouncer;
}

public FilteredServerView getNewSegmentServerView()
public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory()
{
return newSegmentServerView;
return handoffNotifierFactory;
}

public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
Expand All @@ -38,6 +38,7 @@
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;

import java.io.File;
Expand All @@ -56,7 +57,7 @@ public class TaskToolboxFactory
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
Expand All @@ -77,7 +78,7 @@ public TaskToolboxFactory(
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit, but we should keep the arguments in the same order and just replace newSegmentServerView with handoffNotifierFactory. Makes it easier to track changes and resolve conflicts.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

FilteredServerView newSegmentServerView,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
Expand All @@ -97,7 +98,7 @@ public TaskToolboxFactory(
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
Expand All @@ -112,18 +113,17 @@ public TaskToolboxFactory(
public TaskToolbox build(Task task)
{
final File taskWorkDir = config.getTaskWorkDir(task.getId());

return new TaskToolbox(
config,
task,
taskActionClientFactory,
taskActionClientFactory.create(task),
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
handoffNotifierFactory,
queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public String getVersion(final Interval interval)
toolbox.getSegmentPusher(),
lockingSegmentAnnouncer,
segmentPublisher,
toolbox.getNewSegmentServerView(),
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(),
toolbox.getIndexIO(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.TaskActionClientFactory;
Expand All @@ -39,6 +38,7 @@
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -66,7 +66,9 @@ public class TaskToolboxTest
private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class);
private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class);
private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class);
private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class);
private SegmentHandoffNotifierFactory mockHandoffNotifierFactory = EasyMock.createNiceMock(
SegmentHandoffNotifierFactory.class
);
private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
= EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
Expand All @@ -86,7 +88,8 @@ public class TaskToolboxTest
public void setUp() throws IOException
{
EasyMock.expect(task.getId()).andReturn("task_id").anyTimes();
EasyMock.replay(task);
EasyMock.expect(task.getDataSource()).andReturn("task_ds").anyTimes();
EasyMock.replay(task, mockHandoffNotifierFactory);

taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null),
Expand All @@ -97,7 +100,7 @@ public void setUp() throws IOException
mockDataSegmentMover,
mockDataSegmentArchiver,
mockSegmentAnnouncer,
mockNewSegmentServerView,
mockHandoffNotifierFactory,
mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
Expand All @@ -122,12 +125,6 @@ public void testGetSegmentAnnouncer()
Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer());
}

@Test
public void testGetNewSegmentServerView()
{
Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView());
}

@Test
public void testGetQueryRunnerFactoryConglomerate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,26 +223,19 @@ private final List<DataSegment> runTask(final IndexTask indexTask) throws Except

indexTask.run(
new TaskToolbox(
null, null, new TaskActionClientFactory()
null, null, new TaskActionClient()
{
@Override
public TaskActionClient create(Task task)
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
return new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
};
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
}, null, new DataSegmentPusher()
{
Expand Down
Loading