Describe the bug
The Kafka producer cannot get metadata from send callback. It's caused by following code:
|
} else { |
|
result.getRight().complete(new PartitionResponse(Errors.NONE)); |
|
} |
|
headerAndPayload.release(); |
|
}); |
The PartitionResponse only sets one field error, other fields are default value, see
public static final class PartitionResponse {
public Errors error;
public long baseOffset;
public long logAppendTime;
public long logStartOffset;
public PartitionResponse(Errors error) {
this(error, -1L, -1L, -1L);
}
To Reproduce
Let Kafka producer send with a callback:
producer.send(new ProducerRecord(topic, 0, "hello"), (recordMetadata, e) -> {
if (e == null) {
log.info("send to {}", recordMetadata);
} else {
log.info("send failed: {}", e);
}
}).get();
The log:
Expected behavior
Producer should get the right metadata.
Additional context
Currently the offset is not continuous, so the offset is meaningless because Kafka Client uses baseOffset to generate each offset for messages.
This bug needs to be fixed after the continuous offset was implemented.
Describe the bug
The Kafka producer cannot get metadata from send callback. It's caused by following code:
kop/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Lines 618 to 622 in f81f7e4
The
PartitionResponseonly sets one fielderror, other fields are default value, seeTo Reproduce
Let Kafka producer send with a callback:
The log:
Expected behavior
Producer should get the right metadata.
Additional context
Currently the offset is not continuous, so the offset is meaningless because Kafka Client uses
baseOffsetto generate each offset for messages.This bug needs to be fixed after the continuous offset was implemented.