Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Conversation

@eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Dec 16, 2021

When the Kafka client issues a Fetch and sets a maxWait time we are already scheduling a DelayedFetch, but there is no way to trigger that Fetch and it is deemed to wait for the fully timeout.
This adds latency spikes on the Kafka Consumer.

With this patch we are triggering any pending DelayedFetch in case of writing any record to one of the partitions interested by the Fetch.

This is only a first implementation, in the future we can make it better and do not trigger at the first record, but wait in any case for more records to come.
With this implementation the Fetch result will contain usually only 1 record, but this is enough to let the Kafka Client start a new Fetch cycle and do not waste time in doing nothing (waiting for maxWait).

Changes:

  • trigger pending Fetches while producing to the topic
  • add new metric WAITING_FETCHES_TRIGGERED
  • add DelayedOperation#wakeup, that means the operation should wake up due to some trigger (in this case the Production of records to the topic)
  • add a new test that would fail without this patch (because the tests asserts that there is no idle cycle in the Consumer loop)

Copy link
Collaborator

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding a parameter to tryComplete brings too many changes. Most of them only accept a false argument.

I think it's better to add a wakeup() method to DelayedOperation.

    public boolean wakeup() {
        // No ops
        return true;
    }

Then in DelayedFetch, override this method.

    @Override
    public boolean wakeup() {
        // if we are here then we were waiting for the condition
        // someone wrote some messages to one of the topics
        // trigger the Fetch from scratch
        restarted.set(true);
        messageFetchContext.onDataWrittenToSomePartition();
        return true;
    }

Finally, modify DelayedOperationPurgatory#Watchers#tryCompleted:

                } else if (curr.wakeup() && curr.maybeTryComplete()) {

BTW, I did a trick that wakeup() returns a boolean so that wakeup() and maybeTryComplete() calls can be combined in a line. The wakeup() method always returns true.

Copy link
Contributor Author

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower I have addressed your comments.

Nice suggestions !

@BewareMyPower
Copy link
Collaborator

I've left some comments, PTAL. BTW, please update the PR description because the current design changes a bit from the initial design.

eolivelli and others added 2 commits December 21, 2021 10:34
…essageFetchContext.java

Co-authored-by: Yunze Xu <xyzinfernity@163.com>
…elayedFetch.java

Co-authored-by: Yunze Xu <xyzinfernity@163.com>
@eolivelli
Copy link
Contributor Author

@BewareMyPower description updated.
if you want I can drop the "key" parameter as it is currently useless

@eolivelli
Copy link
Contributor Author

@BewareMyPower I have removed that parameter
PTAL

@eolivelli
Copy link
Contributor Author

@BewareMyPower @Demogorgon314 CI passed

@BewareMyPower BewareMyPower merged commit 37f0583 into streamnative:master Dec 22, 2021
BewareMyPower pushed a commit that referenced this pull request Dec 29, 2021
When the Kafka client issues a Fetch and sets a maxWait time we are already scheduling a DelayedFetch, but there is no way to trigger that Fetch and it is deemed to wait for the fully timeout.
This adds latency spikes on the Kafka Consumer.

With this patch we are triggering any pending DelayedFetch in case of writing any record to one of the partitions interested by the Fetch.

This is only a first implementation, in the future we can make it better and do not trigger at the first record, but wait in any case for more records to come.
With this implementation the Fetch result will contain usually only 1 record, but this is enough to let the Kafka Client start a new Fetch cycle and do not waste time in doing nothing (waiting for maxWait).

Changes:
- trigger pending Fetches while producing to the topic
- add new metric WAITING_FETCHES_TRIGGERED
- add DelayedOperation#wakeup, that means the operation should wake up due to some trigger  (in this case the Production of records to the topic)
- add a new test that would fail without this patch (because the tests asserts that there is no idle cycle in the Consumer loop)
BewareMyPower pushed a commit that referenced this pull request Dec 29, 2021
When the Kafka client issues a Fetch and sets a maxWait time we are already scheduling a DelayedFetch, but there is no way to trigger that Fetch and it is deemed to wait for the fully timeout.
This adds latency spikes on the Kafka Consumer.

With this patch we are triggering any pending DelayedFetch in case of writing any record to one of the partitions interested by the Fetch.

This is only a first implementation, in the future we can make it better and do not trigger at the first record, but wait in any case for more records to come.
With this implementation the Fetch result will contain usually only 1 record, but this is enough to let the Kafka Client start a new Fetch cycle and do not waste time in doing nothing (waiting for maxWait).

Changes:
- trigger pending Fetches while producing to the topic
- add new metric WAITING_FETCHES_TRIGGERED
- add DelayedOperation#wakeup, that means the operation should wake up due to some trigger  (in this case the Production of records to the topic)
- add a new test that would fail without this patch (because the tests asserts that there is no idle cycle in the Consumer loop)
@BewareMyPower
Copy link
Collaborator

BewareMyPower commented Jan 26, 2022

I'll continue the discussion here. For the previous discussion:

  1. See [BUG]openmessage Rebalance failed. Unable to consume #1032 for the bug caused by this PR.
  2. Fix NPE caused by empty polls for a consumer of multiple partitions #1033 tried to fix the bug, but it seems far harder than I've thought.
  3. Then Revert "Fetch: trigger pending fetches when producing messages. (#973)" #1034 reverted this PR.

The root cause is that the delayed fetch (DelayedFetch) holds a fetch context (MessageFetchContext) but the delayed fetch doesn't know if the fetch context is recycled. This PR calls KafkaRequestHandler#notifyPendingFetches in produce callback, which runs DelayedOperationPurgatory#checkAndComplete in pulsar-io thread. The stack is

DelayedOperationPurgatory#checkAndComplete
  DelayedOperationPurgatory#Watchers#tryCompleteWatched
    DelayedOperation#DelayedOperationPurgatory
      DelayedFetch#tryComplete
        MessageFetchContext#onDataWrittenToSomePartition  [1]
          MessageFetchContext#handleFetch                 [2]

However, the methods of MessageFetchContext are usually called in BookKeeperClientWorker-OrderedExecutor thread. Then the race condition happens.

The bug happens when a consumer tries to fetch multiple partitions. (The reason is to be figured out)

The first commit of #1033 tries to remove the associated delayed fetches from the purgatory. It fixes the NPE of [1] though this patch harms the performance. However, even ignore the performance overhead, it's very hard to solve the race condition in [2].

The root cause is that this PR makes MessageFetchContext#handleFetch called in a pulsar-io thread. It's too complicated to handle it carefully.

A typical error is

io.streamnative.pulsar.handlers.kop.utils.KopTopic$KoPTopicIllegalArgumentException: Invalid short topic name 'my-topic', it should be in the format of <tenant>/<namespace>/<topic> or <topic>
    at io.streamnative.pulsar.handlers.kop.utils.KopTopic.expandToFullName(KopTopic.java:77) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.utils.KopTopic.<init>(KopTopic.java:59) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.utils.KopTopic.toString(KopTopic.java:96) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$handleFetch$4(MessageFetchContext.java:330) ~[?:?]

It's because when handleFetch is called, the fetch context is recycled and namespacePrefix is null now. Then following code crashes.

            final String fullTopicName = KopTopic.toString(topicPartition, namespacePrefix);

I tried to perform some null checks. However, it's hard to solve the problem thoroughly.

In addition, here is an example code that can reproduce the bug with a KoP standalone.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final String topic = "my-topic";
        try (AdminClient client = AdminClient.create(KafkaUtils.newAdminProperties())) {
            client.createTopics(Collections.singletonList(new NewTopic(topic, 16, (short) 2))).all().get();
        }

        int n = 0;
        final int numMessages = 10000;
        final AtomicInteger numReceived = new AtomicInteger(0);
        final Object object = new Object();
        final AtomicBoolean consumeFailed = new AtomicBoolean(false);

        final ExecutorService executor = Executors.newSingleThreadExecutor();
        final Future<?> future = executor.submit(() -> {
            final Properties props = KafkaUtils.newKafkaProducerProperties();
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);
            final CountDownLatch latch = new CountDownLatch(numMessages);
            final String value = newValue(100);
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
                synchronized (object) {
                    object.wait();
                }
                for (int i = 0; i < numMessages; i++) {
                    if (consumeFailed.get()) {
                        break;
                    }
                    final int index = i;
                    producer.send(new ProducerRecord<>(topic, value), (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("Failed to send {}: {}", index, e.getMessage());
                        }
                        latch.countDown();
                    });
                    Thread.sleep(1);
                }
                if (!consumeFailed.get()) {
                    latch.await();
                }
            } catch (Exception e) {
                log.error("Failed to consume", e);
            }
        });

        final Properties props = KafkaUtils.newKafkaConsumerProperties();
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    // No ops
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    synchronized (object) {
                        object.notifyAll();
                    }
                }
            });
            consumer.poll(Duration.ofMillis(3000));
            while (n < 10000) {
                final Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
                    log.info("Received from {}-{}@{}", record.topic(), record.partition(), record.offset());
                    offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                    n++;
                    numReceived.incrementAndGet();
                }
                consumer.commitAsync(offsetMap, null);
            }
        } catch (Exception e) {
            log.error("Failed to consume at {}", numReceived, e);
            consumeFailed.set(true);
        }

        future.get();
        executor.shutdown();
    }

    private static String newValue(int size) {
        final byte[] bytes = new byte[size];
        Arrays.fill(bytes, (byte) 'a');
        return new String(bytes);
    }

For example, with my latest patch, it could still fail with

2022-01-26 22:22:14:342 [main] ERROR ProduceConsumeDemo - Failed to consume at 4933
java.lang.IllegalStateException: Unexpected error code 7 while fetching at offset 309 from topic-partition my-topic-0
	at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1339)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:613)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)

@eolivelli eolivelli deleted the impl/trigger-delayed-fetch-kop branch January 26, 2022 15:06
@eolivelli
Copy link
Contributor Author

Thank you @BewareMyPower
I will work on a fix.
Probably early next week

Demogorgon314 added a commit that referenced this pull request Jan 27, 2022
BewareMyPower pushed a commit that referenced this pull request Feb 9, 2022
eolivelli pushed a commit to eolivelli/kop that referenced this pull request Feb 24, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants