Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d8a26b6
KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela Sep 9, 2020
c3e1e1f
add extra lines
anatasiavela Oct 29, 2020
0c7203b
remove extra spaces
anatasiavela Oct 29, 2020
38747a5
add data accessor and creat tests for manually created data
anatasiavela Oct 30, 2020
29392c4
access data from accessor
anatasiavela Oct 30, 2020
fb0633c
remove DESCRIBE_QUORUM case and add better error msg
anatasiavela Oct 30, 2020
32f1e9d
change error messages and asserts
anatasiavela Nov 2, 2020
ab50c8f
swap expected and actual, change error msg, add accessor to private d…
anatasiavela Nov 2, 2020
97237af
change to data accessor method
anatasiavela Nov 2, 2020
32578e4
change recordSet field to print sizeInBytes
anatasiavela Nov 2, 2020
ab73c10
trigger build
anatasiavela Nov 3, 2020
d3b5487
add verbose flag to records in JsonConverter
anatasiavela Nov 5, 2020
bd9a82b
add overload method for verbose flag
anatasiavela Nov 6, 2020
0fa879a
add parentheses
anatasiavela Nov 6, 2020
4dd1524
remove verbose tag in unnecessary places
anatasiavela Nov 12, 2020
b102b67
integrate with rebase
anatasiavela Nov 20, 2020
3e439d5
handle null ProduceRequest
anatasiavela Nov 20, 2020
4bca18e
add specific case for ProduceRequest
anatasiavela Dec 3, 2020
9cc6727
make produce request consistent
anatasiavela Dec 4, 2020
5f91474
remove special handling for ProduceRequest
anatasiavela Dec 11, 2020
97072bb
rebase and address comments
anatasiavela Dec 11, 2020
1fc3a11
address minor comments
anatasiavela Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -893,19 +893,19 @@ else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
private void handleApiVersionsResponse(List<ClientResponse> responses,
InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
final String node = req.destination;
if (apiVersionsResponse.data.errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data.errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
if (apiVersionsResponse.data().errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.",
Errors.forCode(apiVersionsResponse.data.errorCode()), node, req.header.correlationId());
Errors.forCode(apiVersionsResponse.data().errorCode()), node, req.header.correlationId());
this.selector.close(node);
processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
// Starting from Apache Kafka 2.4, ApiKeys field is populated with the supported versions of
// the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data.apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data.apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersionsResponse.data().apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
Expand All @@ -914,7 +914,7 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
}
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data.apiKeys());
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3451,12 +3451,12 @@ void handleResponse(AbstractResponse abstractResponse) {
}

// If the error is an error at the group level, the future is failed with it
final Errors groupError = Errors.forCode(response.data.errorCode());
final Errors groupError = Errors.forCode(response.data().errorCode());
if (handleGroupRequestError(groupError, context.future()))
return;

final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())))
);
Expand Down Expand Up @@ -4359,10 +4359,10 @@ ApiVersionsRequest.Builder createRequest(int timeoutMs) {
@Override
void handleResponse(AbstractResponse response) {
final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
if (apiVersionsResponse.data().errorCode() == Errors.NONE.code()) {
future.complete(createFeatureMetadata(apiVersionsResponse));
} else {
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data().errorCode()).exception());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,9 @@ public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
if (isProtocolTypeInconsistent(syncResponse.data().protocolType())) {
log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
syncResponse.data.protocolType(), protocolType());
syncResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful SyncGroup response: {}", syncResponse);
Expand All @@ -743,7 +743,7 @@ public void handle(SyncGroupResponse syncResponse,
synchronized (AbstractCoordinator.this) {
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset
final String protocolName = syncResponse.data.protocolName();
final String protocolName = syncResponse.data().protocolName();
final boolean protocolNameInconsistent = protocolName != null &&
!protocolName.equals(generation.protocolName);

Expand All @@ -761,7 +761,7 @@ public void handle(SyncGroupResponse syncResponse,
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
lastRebalanceStartMs = -1L;

future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
future.complete(ByteBuffer.wrap(syncResponse.data().assignment()));
}
} else {
log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,8 +1348,8 @@ public void handleResponse(AbstractResponse response) {
Errors error = initProducerIdResponse.error();

if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
initProducerIdResponse.data.producerEpoch());
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
initProducerIdResponse.data().producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;
Expand Down Expand Up @@ -1623,7 +1623,7 @@ Priority priority() {
@Override
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode());
Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode());

if (error == Errors.NONE) {
log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public final Send toSend(RequestHeader header) {
return SendBuilder.buildRequestSend(header, data());
}

protected abstract Message data();
public abstract Message data();

// Visible for testing
public final ByteBuffer serialize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public class AddOffsetsToTxnRequest extends AbstractRequest {

public AddOffsetsToTxnRequestData data;
private final AddOffsetsToTxnRequestData data;

public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
public AddOffsetsToTxnRequestData data;
Expand All @@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
}

@Override
protected AddOffsetsToTxnRequestData data() {
public AddOffsetsToTxnRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class AddOffsetsToTxnResponse extends AbstractResponse {

public AddOffsetsToTxnResponseData data;
private final AddOffsetsToTxnResponseData data;

public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) {
super(ApiKeys.ADD_OFFSETS_TO_TXN);
Expand All @@ -57,7 +57,7 @@ public int throttleTimeMs() {
}

@Override
protected AddOffsetsToTxnResponseData data() {
public AddOffsetsToTxnResponseData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class AddPartitionsToTxnRequest extends AbstractRequest {

public final AddPartitionsToTxnRequestData data;
private final AddPartitionsToTxnRequestData data;

private List<TopicPartition> cachedPartitions = null;

Expand Down Expand Up @@ -112,7 +112,7 @@ public List<TopicPartition> partitions() {
}

@Override
protected AddPartitionsToTxnRequestData data() {
public AddPartitionsToTxnRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
public class AddPartitionsToTxnResponse extends AbstractResponse {

public final AddPartitionsToTxnResponseData data;
private final AddPartitionsToTxnResponseData data;

private Map<TopicPartition, Errors> cachedErrorsMap = null;

Expand Down Expand Up @@ -117,7 +117,7 @@ public Map<Errors, Integer> errorCounts() {
}

@Override
protected AddPartitionsToTxnResponseData data() {
public AddPartitionsToTxnResponseData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public boolean validateOnly() {
}

@Override
protected AlterClientQuotasRequestData data() {
public AlterClientQuotasRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Map<Errors, Integer> errorCounts() {
}

@Override
protected AlterClientQuotasResponseData data() {
public AlterClientQuotasResponseData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public boolean validateOnly() {
}

@Override
protected AlterConfigsRequestData data() {
public AlterConfigsRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public AlterIsrRequest(AlterIsrRequestData data, short apiVersion) {
this.data = data;
}

@Override
public AlterIsrRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public AlterIsrResponse(AlterIsrResponseData data) {
this.data = data;
}

@Override
public AlterIsrResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static AlterPartitionReassignmentsResponse parse(ByteBuffer buffer, short
new AlterPartitionReassignmentsResponseData(new ByteBufferAccessor(buffer), version));
}

@Override
public AlterPartitionReassignmentsResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, short ver
}

@Override
protected AlterReplicaLogDirsRequestData data() {
public AlterReplicaLogDirsRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static AlterUserScramCredentialsRequest parse(ByteBuffer buffer, short ve
return new AlterUserScramCredentialsRequest(new AlterUserScramCredentialsRequestData(new ByteBufferAccessor(buffer), version), version);
}

@Override
public AlterUserScramCredentialsRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public AlterUserScramCredentialsResponse(AlterUserScramCredentialsResponseData r
this.data = responseData;
}

@Override
public AlterUserScramCredentialsResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static final ApiVersionsResponse DEFAULT_API_VERSIONS_RESPONSE = createApiVersionsResponse(
DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);

public final ApiVersionsResponseData data;
private final ApiVersionsResponseData data;

public ApiVersionsResponse(ApiVersionsResponseData data) {
super(ApiKeys.API_VERSIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public String toString() {
}
}

public final BeginQuorumEpochRequestData data;
private final BeginQuorumEpochRequestData data;

private BeginQuorumEpochRequest(BeginQuorumEpochRequestData data, short version) {
super(ApiKeys.BEGIN_QUORUM_EPOCH, version);
this.data = data;
}

@Override
protected BeginQuorumEpochRequestData data() {
public BeginQuorumEpochRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class BeginQuorumEpochResponse extends AbstractResponse {
public final BeginQuorumEpochResponseData data;
private final BeginQuorumEpochResponseData data;

public BeginQuorumEpochResponse(BeginQuorumEpochResponseData data) {
super(ApiKeys.BEGIN_QUORUM_EPOCH);
Expand Down Expand Up @@ -86,7 +86,7 @@ public Map<Errors, Integer> errorCounts() {
}

@Override
protected BeginQuorumEpochResponseData data() {
public BeginQuorumEpochResponseData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static ControlledShutdownRequest parse(ByteBuffer buffer, short version)
version);
}

@Override
public ControlledShutdownRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static ControlledShutdownResponse parse(ByteBuffer buffer, short version)
return new ControlledShutdownResponse(new ControlledShutdownResponseData(new ByteBufferAccessor(buffer), version));
}

@Override
public ControlledShutdownResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public List<AclCreation> aclCreations() {
}

@Override
protected CreateAclsRequestData data() {
public CreateAclsRequestData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public CreateAclsResponse(CreateAclsResponseData data) {
}

@Override
protected CreateAclsResponseData data() {
public CreateAclsResponseData data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short versio
version);
}

@Override
public CreateDelegationTokenRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs,
return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
}

@Override
public CreateDelegationTokenResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public String toString() {
this.data = data;
}

@Override
public CreatePartitionsRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public CreatePartitionsResponse(CreatePartitionsResponseData data) {
this.data = data;
}

@Override
public CreatePartitionsResponseData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public CreateTopicsRequest(CreateTopicsRequestData data, short version) {
this.data = data;
}

@Override
public CreateTopicsRequestData data() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public CreateTopicsResponse(CreateTopicsResponseData data) {
this.data = data;
}

@Override
public CreateTopicsResponseData data() {
return data;
}
Expand Down
Loading