Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
Expand All @@ -59,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;

Expand Down Expand Up @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
// Sender should exercise PartitionProduceResponse rather than ProduceResponse.PartitionResponse
// https://issues.apache.org/jira/browse/KAFKA-10696
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we have the jira for tracking, can we remove the TODO? A few more of these in the PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy that

ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
TopicPartition tp = new TopicPartition(r.name(), p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
p.logAppendTimeMs(),
p.logStartOffset(),
p.recordErrors()
.stream()
.map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage()))
.collect(Collectors.toList()),
Comment thread
dajac marked this conversation as resolved.
Outdated
p.errorMessage());
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now);
}
}));
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
// this is the acks = 0 case, just complete all requests
Expand Down Expand Up @@ -721,7 +734,6 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
if (batches.isEmpty())
return;

Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
Expand All @@ -730,7 +742,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}

ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
Expand All @@ -744,16 +756,28 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}

String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);

ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

String nodeId = Integer.toString(destination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
Expand Down Expand Up @@ -138,8 +140,6 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -157,7 +157,7 @@
* Identifiers for all the Kafka APIs
*/
public enum ApiKeys {
PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
PRODUCE(0, "Produce", ProduceRequestData.SCHEMAS, ProduceResponseData.SCHEMAS),
FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS),
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,4 @@ public class CommonFields {
"If the epoch provided by the client is larger than the current epoch known to the broker, then " +
"the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then " +
"the FENCED_LEADER_EPOCH error code will be returned.");

// Group APIs
public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier");

// Transactional APIs
public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id",
"The transactional id or null if the producer is not transactional");
public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -146,7 +147,7 @@ public Map<Errors, Integer> errorCounts(Throwable e) {
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {
switch (apiKey) {
case PRODUCE:
return new ProduceRequest(struct, apiVersion);
return new ProduceRequest(new ProduceRequestData(struct, apiVersion), apiVersion);
case FETCH:
return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);
case LIST_OFFSETS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -96,7 +97,7 @@ public static AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeade
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) {
switch (apiKey) {
case PRODUCE:
return new ProduceResponse(struct);
return new ProduceResponse(new ProduceResponseData(struct, version));
case FETCH:
return new FetchResponse<>(new FetchResponseData(struct, version));
case LIST_OFFSETS:
Expand Down
Loading