Skip to content

MINOR: Remove Struct from Request/Response classes#2513

Closed
ijuma wants to merge 3 commits intoapache:trunkfrom
ijuma:separate-struct
Closed

MINOR: Remove Struct from Request/Response classes#2513
ijuma wants to merge 3 commits intoapache:trunkfrom
ijuma:separate-struct

Conversation

@ijuma
Copy link
Copy Markdown
Member

@ijuma ijuma commented Feb 7, 2017

More details:

  • Replaced struct field in Request/Response with a toStruct method. This
    makes the performance model (including memory usage) easier to understand.
    Note that requests have toStruct() while responses have toStruct(version).
  • Replaced mutable version field in Request.Builder with an immutable
    field desiredVersion and a version parameter passed to the build method.
  • Optimised handleFetchRequest to avoid unnecessary creation of Struct
    instances (from 4 to 2 in the worst case and 2 to 1 in the best case).
  • Various clean-ups in request/response classes and their test. In particular,
    it is now clear what we are testing. Previously, it looked like we were testing
    more than we really were.

With this in place, we could remove AbstractRequest.Builder in the future by
doing the following:

  • Change AbstractRequest.toStruct to accept a version (like responses).
  • Change AbstractRequest.version to be desiredVersion (like Builder).
  • Change ClientRequest to take AbstractRequest.
  • Move validation from the build methods to the request constructors or
    static factory methods.
  • Anything else required for the code to compile again.

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1542/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1545/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1542/
Test FAILed (JDK 7 and Scala 2.10).

@apurvam
Copy link
Copy Markdown
Contributor

apurvam commented Feb 8, 2017

Hi, not sure what the convention is, but is this ready for review?

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Feb 8, 2017

@apurvam, if you want to take a look at the general approach, sure. Detailed review, not yet as I have to fix a few minor things (will remove the WIP once that's done).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1559/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1556/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1556/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1560/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1557/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Copy Markdown
Member Author

@ijuma ijuma Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to create the Struct instance 2 or 3 (if throttleTimeMs > 0) times before. We now create it a single time. I am ignoring the partial Struct that we create if we are throttling a follower (if we count that, then it went from 3 or 4 to 1 or 2).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am probably missing something, but how is this specific change saving upto 2 Struct instance creations? I could see 2 Struct creations if throttleTimeMs > 0, but not 3.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam Good catch, I counted two Structs from FetchRequest creation (if throttleTimeMs > 0) and one from toSend, but the latter didn't happen before my change. So, we just save a Struct creation if throttleTimeMs > 0.

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1557/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1558/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1561/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1558/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1559/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1562/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1559/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1564/
Test FAILed (JDK 8 and Scala 2.11).

@ijuma ijuma force-pushed the separate-struct branch 2 times, most recently from 3f157f4 to 4b1df44 Compare February 8, 2017 15:20
@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1567/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1564/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1564/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1586/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1589/
Test PASSed (JDK 8 and Scala 2.11).

@ijuma ijuma changed the title WIP: Remove Struct from Request/Response classes MINOR: Remove Struct from Request/Response classes Feb 9, 2017
@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Feb 9, 2017

@hachikuji @apurvam, this is ready for review.

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1586/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1605/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1608/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1605/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1623/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1626/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1623/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments. I'll try to get to the rest tomorrow. I'll probably have to ultimately assume that there were no copy/paste errors in the request/response classes 😉 .

@@ -605,11 +605,10 @@ public void onFailure(RuntimeException e) {
private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
final Map<TopicPartition, Long> timestampsToSearch,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can fix this alignment too?

* Factory method for getting a request object based on ApiKey ID and a buffer
*/
public static AbstractRequest getRequest(int requestId, short versionId, ByteBuffer buffer) {
public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another option is to add the request size as a field in ProduceRequest, since that appears to be the only usage.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that it that way first, but it's a bit weird because we don't always need the size and we can't compute it unless the Struct constructor is used. This way is reasonably clean and the cost seems low. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative is to add the size to AbstractRequest perhaps?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that works because we often don't have (and don't need) a size. We had discussed renaming RequestAndSize to RequestMetadata and use that class to keep the request version when we remove the builders. However, the request version is already in the header field, so it seems like we don't need it in this other class. So, I left it as RequestAndSize for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. We can always rename the class if we need to add some more info in there.


/**
* This should only be used if we need to return a response with a different version than the request, which
* should be very rare. Typically {@link #toSend(String, RequestHeader)} should be used.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe mention the ApiVersions use case?

RequestHeader header = clientRequest.makeHeader();
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The awkwardness is that the request objects themselves are kind of a pointless intermediate stage between the builders and the transformation to the Send (for the client anyway). I guess this will be resolved when the builders go away (in effect, the request classes become the builders). I'd probably be inclined to do it in one shot, but up to you. One challenge is dealing with the fact that the request version must be known when the server receives it, but unknown at the time the client creates it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that the builders should go away. Let's do that in a follow-up PR though. :)

The request version thing isn't an issue because it's in the header field as well, I think.


import java.nio.ByteBuffer;

public class SyncGroupResponse extends AbstractResponse {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One benefit of looking at this PR for me is that I am learning about request/responses I never knew about.. Who sends the SyncGroupRequest? What exactly is being synced? Do we have docs about what these requests do at a high level? If not, it may be a good habit to start adding Java docs to the top of the classes as they are being modified. This would make the code base more accessible for newbies.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document it in Protocol.java as that is used to generate the protocol documentation on the website. In another PR, please. :)

Copy link
Copy Markdown
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor. I think the changes are positive.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am probably missing something, but how is this specific change saving upto 2 Struct instance creations? I could see 2 Struct creations if throttleTimeMs > 0, but not 3.

* apache/trunk: (21 commits)
  KAFKA-4340; Follow-up fixing system test failures and handling non default log.retention.ms
  KAFKA-4775; Fix findbugs warnings in kafka-tools
  KAFKA-4484: Set more conservative default values on RocksDB for memory usage
  MINOR: Move compression stream construction into CompressionType
  KAFKA-4709:Error message from Struct.validate() should include the name of the offending field.
  KAFKA-4765; Fixed Intentionally Broken Hosts Resolving to 127.0.53.53 in tests
  KAFKA-4720: Add a KStream#peek(ForeachAction<K, V>) in DSL
  MINOR: fix indention in <pre> tags
  MINOR: add session windows doc to streams.html
  MINOR: Stream metrics documentation
  HOTFIX: fixed section incompatible Steams API changes
  MINOR: don't throw CommitFailedException during suspendTasksAndState
  KAFKA-4761; Fix producer regression handling small or zero batch size
  KAFKA-4756; The auto-generated broker id should be passed to MetricRe…
  KAFKA-4758; Connect missing checks for NO_TIMESTAMP
  MINOR: Update comment in SimpleAclAuthorizer to have correct JSON format
  KAFKA-4340; Change default message.timestamp.difference.max.ms to the same as log.retention.ms
  MINOR: Fix quickstart in docs
  MINOR: add GlobalKTable doc to streams.html
  MINOR: update KafkaStreams.metadataForKey(...) javadoc
  ...
@asfbot
Copy link
Copy Markdown

asfbot commented Feb 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1734/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1731/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1731/
Test PASSed (JDK 8 and Scala 2.12).

final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
// If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
short minVersion = requireTimestamp ? (short) 1 : (short) 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do this here, but as I'm reading this, I wonder why we can't push this logic into the builder. For example, we can add a builder method requireTimestamp(boolean).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll swap the minVersion parameter of forConsumer with a requireTimestamp parameter.

this.version = version;
return this;
public short desiredOrLatestVersion() {
return desiredVersion == null ? ProtoUtils.latestVersion(apiKey.id) : desiredVersion;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: maybe we can just add a latestVersion field to ApiKeys and remove all the direct calls to ProtoUtils.latestVersion?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. This is a great refactor.

}

private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = {
private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, replicaId: Int): Long = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good name change.

@asfgit asfgit closed this in fc1cfe4 Feb 17, 2017
@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Feb 18, 2017

Oops, looks like I'm late to the party. Looks like there is a last minute issue with this patch which @hachikuji is fixing in #2566

We talked about this stuff before and I think this is a good change. I agree with the goal of getting rid of the request/requestBuilder distinction, and removing Struct from Request. I also like that you replaced a lot of places that were using int to represent version with short.

One thing that will be particularly nice is when we can get rid of the AbstractRequest#Builder#toString methods, and go back to simply printing out the Struct representation of the RPC. That way, we don't have to worry about forgetting to include something in a Builder toString method.

However, in general, I'm not sure if we truly need to make everything in the request/RequestBuilder immtuable. The builder and request objects are always clearly owned by a single thread, so there seems to be little benefit in terms of preventing race conditions. The distinction between desiredOrLatestVersion and the version passed to AbstractRequest#Builder#build() feels unnatural and seems like it could lead to confusion when people change the code in the future. We will also have to have many different constructors for requestBuilder / request objects if we plan to make them completely immutable. This tends to make patches blow up in size, especially since we have so many unit tests directly creating request objects. I'm curious whether you feel that the benefits of immutability outweigh these disadvantages, or if you feel like we should try to keep everything in the request/requestBuilder classes totally immutable.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Feb 21, 2017

@cmccabe, thanks for the feedback.

Good point about toString, we may want to add that back to the request/response classes. It does involve a toStruct call, but that's probably not more expensive than creating the String representation.

The immutability benefit for me is related to reasoning about the code. If fields are mutable, then you have to take the time to check who is changing them in the path from when they are created to when they are serialized. The reason for removing setVersion is similar. The code now makes the intent more explicit (IMO). Having said that, we do have to evaluate the benefits against the cost so we'll see.

hachikuji pushed a commit to confluentinc/kafka that referenced this pull request Feb 23, 2017
More details:
* Replaced `struct` field in Request/Response with a `toStruct` method. This
makes the performance model (including memory usage) easier to understand.
Note that requests have `toStruct()` while responses have `toStruct(version)`.
* Replaced mutable `version` field in `Request.Builder` with an immutable
field `desiredVersion` and a `version` parameter passed to the `build` method.
* Optimised `handleFetchRequest` to avoid unnecessary creation of `Struct`
instances (from 4 to 2 in the worst case and 2 to 1 in the best case).
* Various clean-ups in request/response classes and their test. In particular,
it is now clear what we are testing. Previously, it looked like we were testing
more than we really were.

With this in place, we could remove `AbstractRequest.Builder` in the future by
doing the following:
* Change `AbstractRequest.toStruct` to accept a version (like responses).
* Change `AbstractRequest.version` to be `desiredVersion` (like `Builder`).
* Change `ClientRequest` to take `AbstractRequest`.
* Move validation from the `build` methods to the request constructors or
static factory methods.
* Anything else required for the code to compile again.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes apache#2513 from ijuma/separate-struct
@ijuma ijuma deleted the separate-struct branch September 5, 2017 09:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants