Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
43cb678
Add CliIndexer process type and initial task runner implementation
jon-wei Jun 27, 2019
54c2424
Fix HttpRemoteTaskRunnerTest
jon-wei Jul 19, 2019
cd9f9d4
Remove batch sanity check on PeonAppenderatorsManager
jon-wei Jul 19, 2019
780a600
Fix paralle index tests
jon-wei Jul 19, 2019
7fd2e4b
PR comments
jon-wei Jul 19, 2019
8835b0b
Merge remote-tracking branch 'upstream/master' into big_indexer
jon-wei Jul 22, 2019
c5f7a7d
Adjust Jersey resource logging
jon-wei Jul 22, 2019
05af951
Additional cleanup
jon-wei Jul 22, 2019
f4cbf14
Fix SystemSchemaTest
jon-wei Jul 23, 2019
d3e18f9
Add comment to LocalDataSegmentPusherTest absolute path test
jon-wei Jul 23, 2019
f7b182f
More PR comments
jon-wei Jul 23, 2019
37ef2aa
Use Server annotated with RemoteChatHandler
jon-wei Jul 23, 2019
a748e26
More PR comments
jon-wei Jul 23, 2019
8525429
Merge remote-tracking branch 'upstream/master' into big_indexer
jon-wei Jul 23, 2019
a3cc12f
Checkstyle
jon-wei Jul 23, 2019
564bbce
PR comments
jon-wei Jul 24, 2019
f1e6f4b
Merge remote-tracking branch 'upstream/master' into big_indexer
jon-wei Jul 24, 2019
6de3d1f
Add task shutdown to stopGracefully
jon-wei Jul 24, 2019
8e9ce54
Merge remote-tracking branch 'upstream/master' into big_indexer
jon-wei Jul 25, 2019
c68bee6
Small cleanup
jon-wei Jul 25, 2019
ae63057
Compile fix
jon-wei Jul 25, 2019
0d17365
Address PR comments
jon-wei Jul 26, 2019
3be1596
Adjust TaskReportFileWriter and fix nits
jon-wei Jul 26, 2019
bec8f65
Remove unnecessary closer
jon-wei Jul 27, 2019
01c280f
More PR comments
jon-wei Jul 27, 2019
dd8f4ff
Minor adjustments
jon-wei Jul 27, 2019
e3deef6
PR comments
jon-wei Jul 29, 2019
6cf986a
ThreadingTaskRunner: cancel task run future not shutdownFuture and r…
himanshug Jul 29, 2019
7609e3b
Merge pull request #4 from himanshug/pr_8107
jon-wei Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/druid/guice/Jerseys.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.annotations.JSR311Resource;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.logger.Logger;

/**
*/
@PublicApi
public class Jerseys
{
private static final Logger LOG = new Logger(Jerseys.class);

public static void addResource(Binder binder, Class<?> resourceClazz)
{
LOG.info("Adding Jersey resource: " + resourceClazz.getName());
Multibinder.newSetBinder(binder, new TypeLiteral<Class<?>>(){}, JSR311Resource.class)
.addBinding()
.toInstance(resourceClazz);
Expand Down
2 changes: 2 additions & 0 deletions docs/content/querying/lookups.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ The configuration is propagated to the query serving processes (Broker / Router
The query serving processes have an internal API for managing lookups on the process and those are used by the Coordinator.
The Coordinator periodically checks if any of the processes need to load/drop lookups and updates them appropriately.

Please note that only 2 simultaneous lookup configuration propagation requests can be concurrently handled by a single query serving process. This limit is applied to prevent lookup handling from consuming too many server HTTP connections.

# API for configuring lookups

## Bulk update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class CommonCacheNotifier
NodeType.HISTORICAL,
NodeType.PEON,
NodeType.ROUTER,
NodeType.MIDDLE_MANAGER
NodeType.MIDDLE_MANAGER,
NodeType.INDEXER
);

private final DruidNodeDiscoveryProvider discoveryProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
Expand Down Expand Up @@ -72,6 +73,7 @@ public IncrementalPublishingKafkaIndexTaskRunner(
Optional<ChatHandlerProvider> chatHandlerProvider,
CircularBuffer<Throwable> savedParseExceptions,
RowIngestionMetersFactory rowIngestionMetersFactory,
AppenderatorsManager appenderatorsManager,
LockGranularity lockGranularityToUse
)
{
Expand All @@ -82,6 +84,7 @@ public IncrementalPublishingKafkaIndexTaskRunner(
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory,
appenderatorsManager,
lockGranularityToUse
);
this.task = task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -63,7 +64,8 @@ public KafkaIndexTask(
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject ObjectMapper configMapper
@JacksonInject ObjectMapper configMapper,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(
Expand All @@ -76,7 +78,8 @@ public KafkaIndexTask(
chatHandlerProvider,
authorizerMapper,
rowIngestionMetersFactory,
getFormattedGroupId(dataSchema.getDataSource(), TYPE)
getFormattedGroupId(dataSchema.getDataSource(), TYPE),
appenderatorsManager
);
this.configMapper = configMapper;
this.ioConfig = ioConfig;
Expand Down Expand Up @@ -136,6 +139,7 @@ protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory,
appenderatorsManager,
lockGranularityToUse
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(
null,
null,
rowIngestionMetersFactory,
sortingMapper
sortingMapper,
null
));
}
return taskList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
Expand All @@ -68,6 +68,7 @@
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
Expand Down Expand Up @@ -230,6 +232,7 @@ public static Iterable<Object[]> constructorFeeder()
private Long maxTotalRows = null;
private Period intermediateHandoffPeriod = null;

private AppenderatorsManager appenderatorsManager;
private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskStorage taskStorage;
Expand Down Expand Up @@ -372,6 +375,7 @@ public void setupTest() throws IOException
topic = getTopicName();
records = generateRecords(topic);
reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
appenderatorsManager = new TestAppenderatorsManager();
makeToolboxFactory();
}

Expand Down Expand Up @@ -2534,7 +2538,8 @@ private KafkaIndexTask createTask(
null,
null,
rowIngestionMetersFactory,
OBJECT_MAPPER
OBJECT_MAPPER,
appenderatorsManager
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
Expand Down Expand Up @@ -2701,6 +2706,7 @@ public void close()
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
Expand All @@ -2726,7 +2732,7 @@ public void close()
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
new TaskReportFileWriter(reportsFile)
new SingleFileTaskReportFileWriter(reportsFile)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -3556,7 +3557,8 @@ private KafkaIndexTask createKafkaIndexTask(
null,
null,
rowIngestionMetersFactory,
objectMapper
objectMapper,
new DummyForInjectionAppenderatorsManager()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;

Expand All @@ -50,7 +51,8 @@ public KinesisIndexTask(
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject AWSCredentialsConfig awsCredentialsConfig
@JacksonInject AWSCredentialsConfig awsCredentialsConfig,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(
Expand All @@ -63,7 +65,8 @@ public KinesisIndexTask(
chatHandlerProvider,
authorizerMapper,
rowIngestionMetersFactory,
getFormattedGroupId(dataSchema.getDataSource(), TYPE)
getFormattedGroupId(dataSchema.getDataSource(), TYPE),
appenderatorsManager
);
this.awsCredentialsConfig = awsCredentialsConfig;
}
Expand All @@ -79,6 +82,7 @@ protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner()
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory,
appenderatorsManager,
lockGranularityToUse
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
Optional<ChatHandlerProvider> chatHandlerProvider,
CircularBuffer<Throwable> savedParseExceptions,
RowIngestionMetersFactory rowIngestionMetersFactory,
AppenderatorsManager appenderatorsManager,
LockGranularity lockGranularityToUse
)
{
Expand All @@ -76,6 +78,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory,
appenderatorsManager,
lockGranularityToUse
);
this.task = task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ protected List<SeekableStreamIndexTask<String, String>> createIndexTasks(
null,
null,
rowIngestionMetersFactory,
awsCredentialsConfig
awsCredentialsConfig,
null
));
}
return taskList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
Expand All @@ -75,6 +75,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
Expand Down Expand Up @@ -141,6 +142,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
Expand Down Expand Up @@ -231,6 +233,7 @@ public static Iterable<Object[]> constructorFeeder()
private final Period intermediateHandoffPeriod = null;
private int maxRecordsPerPoll;

private AppenderatorsManager appenderatorsManager;
private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskStorage taskStorage;
Expand Down Expand Up @@ -316,6 +319,8 @@ public void setupTest() throws IOException, InterruptedException

recordSupplier = mock(KinesisRecordSupplier.class);

appenderatorsManager = new TestAppenderatorsManager();

// sleep required because of kinesalite
Thread.sleep(500);
makeToolboxFactory();
Expand Down Expand Up @@ -2720,7 +2725,8 @@ private KinesisIndexTask createTask(
null,
null,
rowIngestionMetersFactory,
null
null,
appenderatorsManager
);
}

Expand Down Expand Up @@ -2880,6 +2886,7 @@ public void close()
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
Expand All @@ -2905,7 +2912,7 @@ public void close()
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
new TaskReportFileWriter(reportsFile)
new SingleFileTaskReportFileWriter(reportsFile)
);
}

Expand Down Expand Up @@ -3088,7 +3095,8 @@ private TestableKinesisIndexTask(
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject AWSCredentialsConfig awsCredentialsConfig
@JacksonInject AWSCredentialsConfig awsCredentialsConfig,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(
Expand All @@ -3101,7 +3109,8 @@ private TestableKinesisIndexTask(
chatHandlerProvider,
authorizerMapper,
rowIngestionMetersFactory,
awsCredentialsConfig
awsCredentialsConfig,
appenderatorsManager
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -4259,7 +4260,8 @@ private KinesisIndexTask createKinesisIndexTask(
null,
null,
rowIngestionMetersFactory,
null
null,
new DummyForInjectionAppenderatorsManager()
);
}

Expand Down
Loading