Skip to content

Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted #10930

@wyndhblb

Description

@wyndhblb

Affected Version

0.20.1

Description

We have an interesting issue that we've tracked to the transactional nature of production. The real error that seems occurs (which halts the entire ingest forever, and there's no way to skip it which is unfortunate in it's own)

the back trace is

2021-02-23T22:50:44,663 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test_infra_infrabus_event-50. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:124) ~[?:?]
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:98) ~[?:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:603) ~[druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:267) [druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:145) [druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:451) [druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:423) [druid-indexing-service-0.20.1.jar:0.20.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.kafka.common.InvalidRecordException: Incorrect declared batch size, records still remaining in file
2021-02-23T22:50:44,672 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "xxxx_316d619204b9ebf_bljepncn",
  "status" : "FAILED",
  "duration" : 236717,
  "errorMsg" : "org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test_i...",
  "location" : {
    "host" : null,
    "port" : -1,
    "tlsPort" : -1
  }
}

The hunch is that this may be an aborted production transaction upstream. Hard to say the exact cause (could also be some Compression issues causing some truncation of something). In any case in an effort to see if it was a transaction sort of thing, isolation.level = read_uncommitted was set however, it is not obeyed in the downstream peons.

... a log from the peon

the druid spec log shows

"consumerProperties" : {
      "bootstrap.servers" : "test-infra-kf.di.infra-prod.asapp.com:9092",
      "group.id" : "test-infrabus-druid-minute-1",
      "auto.offset.reset" : "earliest",
      "max.partition.fetch.bytes" : 10485760,
      "fetch.max.bytes" : 52428800,
      "isolation.level" : "read_uncommitted"
    },

.. the log of the kafka-properties of the peon shows


allow.auto.create.topics = true
	client.id = consumer-kafka-supervisor-oaiankjn-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-supervisor-oaiankjn
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_committed
...

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions