Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
Expand All @@ -56,7 +57,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
Expand Down Expand Up @@ -1422,7 +1422,7 @@ private void maybeEnsureValid(RecordBatch batch) {
if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
try {
batch.ensureValid();
} catch (InvalidRecordException e) {
} catch (CorruptRecordException e) {
throw new KafkaException("Record batch for partition " + partition + " at offset " +
batch.baseOffset() + " is invalid, cause: " + e.getMessage());
}
Expand All @@ -1433,7 +1433,7 @@ private void maybeEnsureValid(Record record) {
if (checkCrcs) {
try {
record.ensureValid();
} catch (InvalidRecordException e) {
} catch (CorruptRecordException e) {
throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
+ " is invalid, cause: " + e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.common;

import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.ApiException;

public class InvalidRecordException extends CorruptRecordException {
public class InvalidRecordException extends ApiException {
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Isn't this an incompatible change? Does this exception not get exposed anywhere currently? Maybe it would be better to deprecate common/record/InvalidRecordException, but still leave it around.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think InvalidRecordException was not exposed as a public API before, and now we are moving its package as well to make it a public class.


private static final long serialVersionUID = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.protocol;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand Down Expand Up @@ -315,7 +316,8 @@ public enum Errors {
NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
NoReassignmentInProgressException::new),
GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
GroupSubscribedToTopicException::new);
GroupSubscribedToTopicException::new),
INVALID_RECORD(87, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
Expand Down Expand Up @@ -144,11 +146,11 @@ public byte magic() {
@Override
public void ensureValid() {
if (sizeInBytes() < RECORD_BATCH_OVERHEAD)
throw new InvalidRecordException("Record batch is corrupt (the size " + sizeInBytes() +
throw new CorruptRecordException("Record batch is corrupt (the size " + sizeInBytes() +
" is smaller than the minimum allowed overhead " + RECORD_BATCH_OVERHEAD + ")");

if (!isValid())
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum()
+ ", computed crc = " + computeChecksum() + ")");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Checksums;
Expand Down Expand Up @@ -135,11 +136,11 @@ public TimestampType wrapperRecordTimestampType() {
*/
public void ensureValid() {
if (sizeInBytes() < RECORD_OVERHEAD_V0)
throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
throw new CorruptRecordException("Record is corrupt (crc could not be retrieved as the record is too "
+ "small, size = " + sizeInBytes() + ")");

if (!isValid())
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum()
+ ", computed crc = " + computeChecksum() + ")");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -26,7 +27,6 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -120,9 +120,15 @@ public class ProduceRequest extends AbstractRequest {
*/
private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;

/**
* V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
* (See KIP-467)
*/
private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;

public static Schema[] schemaVersions() {
return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7};
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8};
}

public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
Expand Down Expand Up @@ -337,6 +343,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
case 5:
case 6:
case 7:
case 8:
return new ProduceResponse(responseMap, throttleTimeMs);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
Expand Down Expand Up @@ -434,6 +441,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) {
case 5:
case 6:
case 7:
case 8:
return RecordBatch.MAGIC_VALUE_V2;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -53,28 +54,37 @@ public class ProduceResponse extends AbstractResponse {
/**
* Possible error code:
*
* CORRUPT_MESSAGE (2)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* MESSAGE_TOO_LARGE (10)
* INVALID_TOPIC (17)
* RECORD_LIST_TOO_LARGE (18)
* NOT_ENOUGH_REPLICAS (19)
* NOT_ENOUGH_REPLICAS_AFTER_APPEND (20)
* INVALID_REQUIRED_ACKS (21)
* TOPIC_AUTHORIZATION_FAILED (29)
* UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
* INVALID_PRODUCER_EPOCH (47)
* CLUSTER_AUTHORIZATION_FAILED (31)
* TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
* {@link Errors#CORRUPT_MESSAGE}
* {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* {@link Errors#NOT_LEADER_FOR_PARTITION}
* {@link Errors#MESSAGE_TOO_LARGE}
* {@link Errors#INVALID_TOPIC_EXCEPTION}
* {@link Errors#RECORD_LIST_TOO_LARGE}
* {@link Errors#NOT_ENOUGH_REPLICAS}
* {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
* {@link Errors#INVALID_REQUIRED_ACKS}
* {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT}
* {@link Errors#INVALID_PRODUCER_EPOCH}
* {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* {@link Errors#INVALID_RECORD}
*/

private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
private static final String ERROR_RECORDS_KEY_NAME = "error_records";
private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset";
private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";

private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
"The start offset of the log at the time this produce response was created", INVALID_OFFSET);
private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME,
"The error message of the record that caused the batch to be dropped");
private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
"The global error message summarizing the common root cause of the records that caused the batch to be dropped");

private static final Schema PRODUCE_RESPONSE_V0 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
Expand Down Expand Up @@ -149,9 +159,32 @@ public class ProduceResponse extends AbstractResponse {
*/
private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;

/**
* V8 adds error_records and error_message. (see KIP-467)
*/
public static final Schema PRODUCE_RESPONSE_V8 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64),
new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
"the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
"time when the messages are appended."),
LOG_START_OFFSET_FIELD,
new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema(
new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " +
"that caused the batch to be dropped"),
RELATIVE_OFFSET_ERROR_MESSAGE_FIELD
)), "The relative offsets of records that caused the batch to be dropped"),
ERROR_MESSAGE_FIELD)))))),
Comment thread
hachikuji marked this conversation as resolved.
THROTTLE_TIME_MS);

public static Schema[] schemaVersions() {
return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7};
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8};
}

private final Map<TopicPartition, PartitionResponse> responses;
Expand Down Expand Up @@ -183,15 +216,28 @@ public ProduceResponse(Struct struct) {
for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicRespStruct = (Struct) topicResponse;
String topic = topicRespStruct.get(TOPIC_NAME);

for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
Struct partRespStruct = (Struct) partResponse;
int partition = partRespStruct.get(PARTITION_ID);
Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);

Map<Integer, String> errorRecords = new HashMap<>();
if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) {
for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) {
Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage;
Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME);
String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, "");
errorRecords.put(relativeOffset, relativeOffsetErrorMessage);
}
}

String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, "");
TopicPartition tp = new TopicPartition(topic, partition);
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset));
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage));
}
}
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
Expand Down Expand Up @@ -223,6 +269,18 @@ protected Struct toStruct(short version) {
if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can probably do some cleanup in pr.3, e.g. line 269 above can be replaced with setIfExist as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack :)

List<Struct> errorRecords = new ArrayList<>();
for (Map.Entry<Integer, String> recordOffsetAndMessage : part.errorRecords.entrySet()) {
Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME)
.set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey())
.setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue());
errorRecords.add(recordOffsetAndMessageStruct);
}

partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray());

partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
partitionArray.add(partStruct);
}
topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
Expand Down Expand Up @@ -256,16 +314,28 @@ public static final class PartitionResponse {
public long baseOffset;
public long logAppendTime;
public long logStartOffset;
public Map<Integer, String> errorRecords;
public String errorMessage;

public PartitionResponse(Errors error) {
this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET);
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null);
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords) {
this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, "");
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords, String errorMessage) {
this.error = error;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset;
this.errorRecords = errorRecords;
this.errorMessage = errorMessage;
}

@Override
Expand All @@ -280,6 +350,10 @@ public String toString() {
b.append(logAppendTime);
b.append(", logStartOffset: ");
b.append(logStartOffset);
b.append(", errorRecords: ");
b.append(errorRecords);
b.append(", errorMessage: ");
b.append(errorMessage);
b.append('}');
return b.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
// Version 5 and 6 are the same as version 3.
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
"validVersions": "0-7",
//
// Starting in Version 8, response has ErrorRecords and ErrorMEssage. See KIP-467.
"validVersions": "0-8",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId",
"about": "The transactional ID, or null if the producer is not transactional." },
Expand Down
Loading