Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
Expand All @@ -50,6 +53,7 @@
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -289,12 +293,25 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
)
);

LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
new LookupNodeService((String) getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nicer to take the informatino for this in the task spec instead of runtime config. That would give us the ability to set it on a per-task basis.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookupTier is something that user might want to override so made it overridable via task context key lookupTier in Realtime and Kafka indexing tasks.

lookupNodeService.getName(), lookupNodeService
)
);

try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);

appenderator = appenderator0;

Expand Down Expand Up @@ -597,6 +614,7 @@ public String apply(DataSegment input)
}
}

toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();

return success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import io.druid.data.input.impl.JSONPathSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
Expand Down Expand Up @@ -117,7 +120,9 @@
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -1637,7 +1642,11 @@ public List<StorageLocationConfig> getLocations()
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
testUtils.getTestIndexMergerV9()
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.config.TaskConfig;
Expand All @@ -44,6 +47,7 @@
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -87,6 +91,11 @@ public class TaskToolbox
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;

private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
private final DataNodeService dataNodeService;

public TaskToolbox(
TaskConfig config,
TaskActionClient taskActionClient,
Expand All @@ -107,7 +116,11 @@ public TaskToolbox(
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
)
{
this.config = config;
Expand All @@ -130,6 +143,10 @@ public TaskToolbox(
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
}

public TaskConfig getConfig()
Expand Down Expand Up @@ -271,4 +288,24 @@ public File getPersistDir()
{
return new File(taskWorkDir, "persist");
}

public DruidNodeAnnouncer getDruidNodeAnnouncer()
{
return druidNodeAnnouncer;
}

public LookupNodeService getLookupNodeService()
{
return lookupNodeService;
}

public DataNodeService getDataNodeService()
{
return dataNodeService;
}

public DruidNode getDruidNode()
{
return druidNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
Expand All @@ -39,6 +43,7 @@
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;

Expand Down Expand Up @@ -69,6 +74,10 @@ public class TaskToolboxFactory
private final Cache cache;
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
private final DataNodeService dataNodeService;

@Inject
public TaskToolboxFactory(
Expand All @@ -90,7 +99,11 @@ public TaskToolboxFactory(
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
)
{
this.config = config;
Expand All @@ -112,6 +125,10 @@ public TaskToolboxFactory(
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = indexMergerV9;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
}

public TaskToolbox build(Task task)
Expand All @@ -137,7 +154,11 @@ public TaskToolbox build(Task task)
indexIO,
cache,
cacheConfig,
indexMergerV9
indexMergerV9,
druidNodeAnnouncer,
druidNode,
lookupNodeService,
dataNodeService
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -78,6 +81,8 @@

public class RealtimeIndexTask extends AbstractTask
{
public static final String CTX_KEY_LOOKUP_TIER = "lookupTier";

private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
private final static Random random = new Random();

Expand Down Expand Up @@ -324,8 +329,22 @@ public String getVersion(final Interval interval)
Supplier<Committer> committerSupplier = null;
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();

LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
)
);

try {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);


plumber.startJob();

Expand Down Expand Up @@ -431,6 +450,7 @@ public void run()
}

toolbox.getDataSegmentServerAnnouncer().unannounce();
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
}

log.info("Job done!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ public void setUp() throws IOException
mockIndexIO,
mockCache,
mockCacheConfig,
mockIndexMergerV9
mockIndexMergerV9,
null,
null,
null,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ public Map<String, Object> makeLoadSpec(URI uri)
throw new UnsupportedOperationException();
}
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, indexMergerV9
indexIO, null, null, indexMergerV9, null, null, null, null
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -104,7 +107,9 @@
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
Expand Down Expand Up @@ -1048,7 +1053,11 @@ public List<StorageLocationConfig> getLocations()
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
testUtils.getTestIndexMergerV9()
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
);

return toolboxFactory.build(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
}, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class)
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class),
null, null, null, null
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ public List<StorageLocationConfig> getLocations()
INDEX_IO,
null,
null,
INDEX_MERGER_V9
INDEX_MERGER_V9,
null,
null,
null,
null
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ public List<StorageLocationConfig> getLocations()
INDEX_IO,
null,
null,
INDEX_MERGER_V9
INDEX_MERGER_V9,
null,
null,
null,
null
);
final Injector injector = Guice.createInjector(
new Module()
Expand Down
Loading