From 1752f0754a39352f6bea4b9eab1d7436bdbb33b7 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sun, 25 Sep 2016 10:56:54 -0500 Subject: [PATCH] add context to kafka supervisor for the kafka indexing task --- .../indexing/kafka/supervisor/KafkaSupervisor.java | 2 +- .../kafka/supervisor/KafkaSupervisorSpec.java | 11 +++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 0c1b58eb4742..96879936f273 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1270,7 +1270,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, - ImmutableMap.of(), + spec.getContext(), null ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index d4e0f2055c19..82ad86cb8fef 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -33,11 +33,14 @@ import io.druid.indexing.overlord.supervisor.SupervisorSpec; import io.druid.segment.indexing.DataSchema; +import java.util.Map; + public class KafkaSupervisorSpec implements SupervisorSpec { private final DataSchema dataSchema; private final KafkaSupervisorTuningConfig tuningConfig; private final KafkaSupervisorIOConfig ioConfig; + private final Map context; private final TaskStorage taskStorage; private final TaskMaster taskMaster; @@ -50,6 +53,7 @@ public KafkaSupervisorSpec( @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("context") Map context, @JacksonInject TaskStorage taskStorage, @JacksonInject TaskMaster taskMaster, @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, @@ -77,6 +81,7 @@ public KafkaSupervisorSpec( null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + this.context = context; this.taskStorage = taskStorage; this.taskMaster = taskMaster; @@ -103,6 +108,12 @@ public KafkaSupervisorIOConfig getIoConfig() return ioConfig; } + @JsonProperty + public Map getContext() + { + return context; + } + @Override public String getId() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a7819788722e..231ee9fbe266 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1632,6 +1632,7 @@ public KafkaIndexTaskClient build( dataSchema, tuningConfig, kafkaSupervisorIOConfig, + null, taskStorage, taskMaster, indexerMetadataStorageCoordinator,