Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ A sample supervisor spec is shown below:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. See [next section](#more-on-consumerproperties) for more information.|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
Expand All @@ -147,6 +147,18 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|

#### More on consumerProperties

This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
By default, `isolation.level` is set to `read_committed`. It should be set to `read_uncommitted` if you don't want Druid to consume only committed transactions or working with older versions of Kafka servers with no Transactions support.

There are few cases that require fetching few/all of consumer properties at runtime e.g. when `bootstrap.servers` is not known upfront or not static, to enable SSL connections users might have to provide passwords for `keystore`, `truststore` and `key` secretly.
For such consumer properties, user can implement a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) to supply them at runtime, by adding
`druid.dynamic.config.provider`=`{"type": "<registered_dynamic_config_provider_name>", ...}`
in consumerProperties map.

Note: In 0.20.0 or older Druid versions, for SSL connections, the `keystore`, `truststore` and `key` passwords can also be provided as a [Password Provider](../../operations/password-provider.md). This is deprecated.

#### Specifying data format

Kafka indexing service supports both [`inputFormat`](../../ingestion/data-formats.md#input-format) and [`parser`](../../ingestion/data-formats.md#parser) to specify the data format.
Expand Down
18 changes: 18 additions & 0 deletions docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Druid's extensions leverage Guice in order to add things at runtime. Basically,
1. Add new Jersey resources by calling `Jerseys.addResource(binder, clazz)`.
1. Add new Jetty filters by extending `org.apache.druid.server.initialization.jetty.ServletFilterHolder`.
1. Add new secret providers by extending `org.apache.druid.metadata.PasswordProvider`.
1. Add new dynamic configuration providers by extending `org.apache.druid.metadata.DynamicConfigProvider`.
1. Add new ingest transform by implementing the `org.apache.druid.segment.transform.Transform` interface from the `druid-processing` package.
1. Bundle your extension with all the other Druid extensions

Expand Down Expand Up @@ -240,6 +241,23 @@ In your implementation of `org.apache.druid.initialization.DruidModule`, `getJac

where `SomePasswordProvider` is the implementation of `PasswordProvider` interface, you can have a look at `org.apache.druid.metadata.EnvironmentVariablePasswordProvider` for example.

### Adding a new DynamicConfigProvider implementation

You will need to implement `org.apache.druid.metadata.DynamicConfigProvider` interface. For every place where Druid uses DynamicConfigProvider, a new instance of the implementation will be created,
thus make sure all the necessary information required for fetching all information is supplied during object instantiation.
In your implementation of `org.apache.druid.initialization.DruidModule`, `getJacksonModules` should look something like this -

``` java
return ImmutableList.of(
new SimpleModule("SomeDynamicConfigProviderModule")
.registerSubtypes(
new NamedType(SomeDynamicConfigProvider.class, "some")
)
);
```

where `SomeDynamicConfigProvider` is the implementation of `DynamicConfigProvider` interface, you can have a look at `org.apache.druid.metadata.MapStringDynamicConfigProvider` for example.

### Adding a Transform Extension

To create a transform extension implement the `org.apache.druid.segment.transform.Transform` interface. You'll need to install the `druid-processing` package to import `org.apache.druid.segment.transform`.
Expand Down
33 changes: 33 additions & 0 deletions docs/operations/dynamic-config-provider.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
id: dynamic-config-provider
title: "Dynamic Config Providers"
---

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

Druid's core mechanism of supplying multiple related set of credentials/secrets/configurations via Druid extension mechanism. Currently, it is only supported for providing Kafka Consumer configuration in [Kafka Ingestion](../development/extensions-core/kafka-ingestion.md).

Eventually this will replace [PasswordProvider](./password-provider.md)


Users can create custom extension of the `DynamicConfigProvider` interface that is registered at Druid process startup.

For more information, see [Adding a new DynamicConfigProvider implementation](../development/modules.md#adding-a-new-dynamicconfigprovider-implementation).

2 changes: 2 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Dropwizard
dropwizard
DruidInputSource
DruidSQL
DynamicConfigProvider
EC2
EC2ContainerCredentialsProviderWrapper
ECS
Expand Down Expand Up @@ -217,6 +218,7 @@ colocation
compactable
config
configs
consumerProperties
cron
csv
customizable
Expand Down