Skip to content

Remove hard limitation that druid(after 0.15.0) only can consume Kafka version 0.11.x or better#10551

Merged
himanshug merged 29 commits intoapache:masterfrom
zhangyue19921010:modify-kafka-Version
Dec 4, 2020
Merged

Remove hard limitation that druid(after 0.15.0) only can consume Kafka version 0.11.x or better#10551
himanshug merged 29 commits intoapache:masterfrom
zhangyue19921010:modify-kafka-Version

Conversation

@zhangyue19921010
Copy link
Copy Markdown
Contributor

@zhangyue19921010 zhangyue19921010 commented Nov 3, 2020

Fixes #8279

Description

In our PRD Environment, we deployed two versions of Kafka, which are 0.10.2.1 and 2.4.1. And open source druid(from 0.15.0 to 0.20.0) only can support to consume messages from Kafka 0.11.x or better. And when consume Kafka 0.10.2.1, it throws UnsupportedVersionException :

2020-11-03T07:40:03,539 WARN [KafkaSupervisor-xxxxx] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception in supervisor run loop for dataSource [xxxxx]
org.apache.druid.indexing.seekablestream.common.StreamException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53) ~[?:?]


Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].

And as the Druid docs says

The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or better before using this functionality. Refer Kafka upgrade guide if you are using older version of Kafka brokers.

The root cause of this exception (SDK 2.6.0 + Kafka 0.10.2.1) is Hard-coded ‘props.put("isolation.level", "read_committed");’ in the ‘KafkaConsumerConfigs’ which is unnecessary.
This PR adds a new config named consumeTransactionally in KafkaIndexTaskIOConfig. Default is true, druid can consume Kafka topics consumeTransactionally same as current Druid behavior. And users can set this config false. Set consumeTransactionally false here can disable druid to consume Kafka in a transactional way. And druid could consume lower version of Kafka now, such as 0.10.2.1, like

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "metrics-kafka",
    "timestampSpec": {
      "column": "timestamp",
      "format": "auto"
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "timestamp",
        "value"
      ]
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
    }
  },
  "ioConfig": {
    "topic": "metrics",
    "inputFormat": {
      "type": "json"
    },
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "consumeTransactionally": false,
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  }
}

Enable this feature, Druid can both consumes Kafka 2.4.1 and Kafka 0.10.2.1 through different configurations of supervisors.
Here are logs of overlord service:
Ignore consumeTransactionally or set it true

2020-11-03T08:31:15,470 INFO [LeaderSelector[/druid/overlord/_OVERLORD]] org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = none
	bootstrap.servers = [xxx, xxx, xxx]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-kafka-supervisor-hhphhhag-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-supervisor-hhphhhag
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_committed
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 10000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Set consumeTransactionally false

	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = none
	bootstrap.servers = [xxx, xxx, xxx]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-kafka-supervisor-bhfahfke-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-supervisor-bhfahfke
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 10000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • KafkaConsumerConfigs

@zhangyue19921010 zhangyue19921010 changed the title Remove hard limitation that druid only can consume Kafka version 0.11.x or batter Remove hard limitation that druid only can consume Kafka version 0.11.x or better Nov 3, 2020
@zhangyue19921010 zhangyue19921010 changed the title Remove hard limitation that druid only can consume Kafka version 0.11.x or better Remove hard limitation that druid(after 0.15.0) only can consume Kafka version 0.11.x or better Nov 3, 2020
@zhangyue19921010
Copy link
Copy Markdown
Contributor Author

zhangyue19921010 commented Nov 5, 2020

@jihoonson Hi Jihoonson, sorry for bothering. Could you please help to review my code?Thanks!

props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
props.put("isolation.level", "read_committed");
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to remove isolation.level here and leaves the configuration to users changes the current behavior.

Currently users care nothing about this kafka configuration when using a higher version of kafka such as 0.11. By removing it, they need to set this property in supervisor spec, or the default value, which is read_uncommitted, will be applied, which may be not what they expect.

If this is the root cause that limits the Druid to use Kafka lower than 0.11, I think maybe we can introduce another property, as the same way as pollTimeout property does, then we can unset isolation.level property according to the value of this new property. Only those who want to use Druid with older kafka need to set this new property.

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It will change the current behavior.

You are right, most people really don't care this Kafka configuration(Let it be default behavior). As I know the default logic of the Kafka Producer is not to enable transactions feature and the same as Kafka Consumer. So that maybe Druid Kafka indexing service keep the same default logic is more reasonable.

Furthermore, if a Druid user want Druid to consume transactional Kafka topics, I think it is more reasonable to let users set this parameter in consumerProperties like 'bootstrap.servers' because he knows what he is going to do and why.

By the way, Druid from 0.15 to better can't consume old version Kafka is really confused me(I believe it's not just me). Generally speaking, higher Kafka consumer client is able to consume old version Kafka cluster unless it involves high-version-specific api. And this hard limitation block us to upgrade Druid cluster from 0.14.2 for a long time. If we don't remove this config, we have to upgrade PRD Kafka cluster, which is a much more heavy work to do because there are too many consumers of Kafka such as Spark and Flink. Orz...

Thanks for reviewing!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know whether or not the users have enabled transactional message in their clusters, so changing the current behavior may cause backward compatibility problem.

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense! I add a new property named consumeTransactionally in consumerProperties, which is a Kafka consumer level property to control isolation.level.
If users don't set consumeTransactionally or set consumeTransactionally true, druid will consume Kafka Transactionally by default.
Set consumeTransactionally false here can disable druid to consume Kafka Transactionally through unset isolation.level property, which means druid can consume lower version of Kafka now like 0.10.x
In this way, we don't change current Druid default behavior and provide a way to let druid consume lower version Kafka.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 Hi Frank, sorry to bother you. I have tested this change in our Druid cluster recently. What should I do next ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this property in KafkaIndexTaskIOConfig.consumerProperties might cause some confusion. consumerProperties is designed to store kafka official properties. It's better to put the new property in KafkaIndexTaskIOConfig directly and put the code related to this property in KafkaRecordSupplier.addConsumerPropertiesFromConfig

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi FrankChen021, I have put the new property consumeTransactionally in the KafkaSupervisorIOConfig directly. If users ignore this config or set this config true, Druid would consume Kafka transactionally same as current Druid behavior.

Set consumeTransactionally false here can disable Druid to consume Kafka transactionally through unset isolation.level property, meanwhile Druid could consume lower version of Kafka now like 0.10.2.1 .

In this way, we don't change current Druid default behavior and provide a way to a way to make Druid have the ability to consume different versions of Kafka and remove the hard limitation that Druid(after 0.15.0) only can consume Kafka version 0.11.x or better.

I have tested this PR in our Dev Druid cluster. Please take a look.

Copy link
Copy Markdown
Member

@nishantmonu51 nishantmonu51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, 👍

|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`consumeTransactionally`|Boolean|Set `consumeTransactionally` false here can disable druid to consume Kafka in a transactional way. And druid could consume lower version of Kafka now, such as 0.10.2.1 |no (default == true)|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of this doc, there's paragraph that describes the version of kafka that are supported. Since this PR
provides a way to support those kafka clusters lower than 0.11, we should also make some changes to the doc

The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the
Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or
better before using this functionality. Refer Kafka upgrade guide > better before using this functionality. Refer Kafka upgrade guide
if you are using older version of Kafka brokers. > if you are using older version of Kafka brokers.

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 Done :)

import org.apache.kafka.common.serialization.Deserializer;

import javax.annotation.Nonnull;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this unnecessary change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this unnecessary change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));

workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this unnecessary change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@FrankChen021
Copy link
Copy Markdown
Member

Hi, @jihoonson would you like to take a look at this change ?

@zhangyue19921010
Copy link
Copy Markdown
Contributor Author

Do Travis CI - Pull Request again

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@a2l007
Copy link
Copy Markdown
Contributor

a2l007 commented Nov 20, 2020

Presently, the read_committed isolation level supports transactional producers as well as preserves the behavior for non-transactional producers. If this property is made user-configurable, it is possible that a cluster operator may unintentionally disable it for higher versions of Kafka that support transactional topics. Are there any issues that may arise if we disable this property for higher versions of Kafka? If so, I think we should have some sort of logging that makes the operator aware of it.

Also with non-transactional producers in lower versions of Kafka, it may be reasonable to expect that offsets are sequential. Since we no longer have the offset gap check in Druid, the docs may need to make it clear that Druid doesn't perform this check.
@surekhasaharan Since you had introduced the transactional topic support in Druid, I wanted to check if you had any comments about this PR.

@himanshug
Copy link
Copy Markdown
Contributor

himanshug commented Dec 1, 2020

@zhangyue19921010 I think making Druid flexible to work with older versions of Kafka is good, so +1 on that.

regarding the specific implementation here --
does the problem happen if isolation.level=read_uncommitted ? if no, then all that is needed is to make "isolation.level" overridable via the ioConfig instead of introducing another configuration consumeTransactionally.
That can be achieved by removing props.put("isolation.level", "read_committed"); from KafkaConsumerConfigs.getConsumerProperties() and making that a default in KafkaIndexTaskIOConfig if isolation.level is not explicitly supplied by the user.

does that make sense?

@zhangyue19921010
Copy link
Copy Markdown
Contributor Author

zhangyue19921010 commented Dec 1, 2020

@himanshug Thanks for your review.

does the problem happen if isolation.level=read_uncommitted ?

No problem will happen . When set isolation.level read_uncommitted, Druid can consume higher version Kafka un-transactionally and Druid also can consume lower version Kafka.

I just change the strategy for setting isolation.level in KafkaConsumerConfigs.getConsumerProperties().

props.put("isolation.level", customerConsumerProperties.getOrDefault("isolation.level", "read_committed"));

If users ignore this parameter, Druid will set isolation.level read_committed by default.
If users set isolation.level read_committed or set isolation.level read_uncommitted in ConsumerProperties, Druid will set this parameter as required by users. In other words, user-property has a higher priority.

This solution makes code changes lighter, safer and no need to add new parameters. This solution also does not change current behavior of Kafka consumption and provide a way to make Druid has the ability to consume lower versions of Kafka.

All the changes have been tested.
屏幕快照 2020-12-01 下午2 56 07

010 means Kafka version 0.10.2.1
241 means Kafka version 2.4.1
committed means ignore isolation.level
uncommitted means set isolation.level=read_uncommitted in ConsumerProperties

xxx__dev__010__committed supervisor got errors as expected while others work fine. Here is the error message in Overlord :

2020-12-01T06:56:38,634 WARN [KafkaSupervisor-xxxx__dev__010__committed] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception in supervisor run loop for dataSource [xxxx__dev__010__committed]
org.apache.druid.indexing.seekablestream.common.StreamException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53) ~[?:?]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStreamForPartition(SeekableStreamSupervisor.java:3081) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStorageForPartition(SeekableStreamSupervisor.java:3031) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.generateStartingSequencesForPartitionGroup(SeekableStreamSupervisor.java:2973) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.createNewTasks(SeekableStreamSupervisor.java:2844) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1066) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$RunNotice.handle(SeekableStreamSupervisor.java:316) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:746) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].

@pphust
Copy link
Copy Markdown
Contributor

pphust commented Dec 2, 2020

    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
    final Properties props = new Properties();
    addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
    props.putAll(consumerConfigs);

According to above code, consumerProperties(from KafkaSupervisorIOConfig) will be overwritten by consumerConfigs which is set in KafkaConsumerConfigs class.
It looks a bit strange for me that user custom setting is NOT prior to static setting. Is it a good idea if we just change the order like below? Druid user will get more flexibility on kafka connect setting and I think the new isolation.level=read_uncommitted will work well also.

    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
    final Properties props = new Properties();
    **props.putAll(consumerConfigs);**
    addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);

@zhangyue19921010
Copy link
Copy Markdown
Contributor Author

Hi @pphust

    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
    final Properties props = new Properties();
    addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
    props.putAll(consumerConfigs);

According to above code, consumerProperties(from KafkaSupervisorIOConfig) will be overwritten by consumerConfigs which is set in KafkaConsumerConfigs class.
It looks a bit strange for me that user custom setting is NOT prior to static setting. Is it a good idea if we just change the order like below? Druid user will get more flexibility on kafka connect setting and I think the new isolation.level=read_uncommitted will work well also.

    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
    final Properties props = new Properties();
    **props.putAll(consumerConfigs);**
    addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);

I also considered the feasibility of this solution before. What I am concerned is that some configs such as props.put("auto.offset.reset", "none") and props.put("enable.auto.commit", "false") are important for Druid to provide exactly once ingestion guarantees. If these properties are made user-configurable, it is possible that users may unintentionally break the exactly-once. So it may be more appropriate to keep the existing priorities for important parameters and flex other configs like isolation.level through getOrDefault()

{

public static Map<String, Object> getConsumerProperties()
public static Map<String, Object> getConsumerProperties(Map<String, Object> customerConsumerProperties)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would likely be simpler to leave this alone, remove the line props.put("isolation.level".. altogether

and put something like props.put("isolation.level", customerConsumerProperties.getOrDefault("isolation.level", "read_committed")); right after https://github.com/apache/druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java#L78

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @himanshug Thanks for your review!
I agree with it would likely be simpler to leave this alone, remove the line props.put("isolation.level".. altogether. But I think maybe it is not very appropriate to modify https://github.com/apache/druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java#L78

I have tried to do changes in KafkaSupervisorIOConfig.java as you said (cc59d8b)

    this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
    consumerProperties.putIfAbsent("isolation.level", "read_committed");
    Preconditions.checkNotNull(

and functional testing is fine while UT is failed at

Assert.assertEquals(oldConfig.getConsumerProperties(), currentConfig.getConsumerProperties());
and
Assert.assertEquals(currentConfig.getConsumerProperties(), oldConfig.getConsumerProperties());

These failure means changes in KafkaSupervisorIOConfig may cause compatibility issues when deserialize from old IoConfig to new IoConfig or deserialize from new IoConfig to old IoConfig. In other words, we need to ensure that the deserialize between new IoConfig and old IoConfig is completely consistent and smooth.

So I switched to a safer way in the latest commit.
All the changes is tested on Dev cluster.
PTAL :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to ensure that the deserialize between new IoConfig and old IoConfig is completely consistent and smooth

well, my main concern was the consumeTransactionally param which has been removed . Though I fail to see why simply adding the default to KafkaSupervisorIOConfig.java would not have same behavior functionally (or for backward compatibility as well)... but I am not so worried about the specifics at this point as it is easy to change later if needed.

@himanshug
Copy link
Copy Markdown
Contributor

+1 after the build

@himanshug himanshug merged commit 229b5f3 into apache:master Dec 4, 2020
@zhangyue19921010
Copy link
Copy Markdown
Contributor Author

@FrankChen021 @nishantmonu51 @a2l007 @himanshug @pphust Thanks for your review!
And @himanshug Thanks for your merge!

@jihoonson jihoonson added this to the 0.21.0 milestone Jan 4, 2021
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jan 22, 2021
…a version 0.11.x or better (apache#10551)

* remove build in kafka consumer config :

* modify druid docs of kafka indexing service

* yuezhang

* modify doc

* modify docs

* fix kafkaindexTaskTest.java

* revert uncessary change

* add more logs and modify docs

* revert jdk version

* modify docs

* modify-kafka-version v2

* modify docs

* modify docs

* modify docs

* modify docs

* modify docs

* done

* remove useless import

* change code and add UT

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
@zhangyue19921010 zhangyue19921010 deleted the modify-kafka-Version branch February 9, 2021 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

8 participants