KAFKA-9854 Re-authenticating causes mismatched parse of response#8471
KAFKA-9854 Re-authenticating causes mismatched parse of response#8471rajinisivaram merged 1 commit intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
the side effect is that the mismatched error about correlation id is added back to Selector so I'm digging in it for better approach.
There was a problem hiding this comment.
a case of parsing buffer of ListOffsetResponse to SaslHandshakeResponse
|
@chia7712 Thanks for noticing and working on this issue. Could you describe the specific steps that have to occur for this state to exist? |
this issue happens in polling data through
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
if (requestHeader.correlationId() != responseHeader.correlationId())
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
} |
|
Ok, as you pointed out, this code: seems to imply that Does that summarize the situation, and if so, is it a simple matter of catching that additional possible exception type? Or is there more to the story? |
I'm afraid not. It seems to me it is always a risk that we assume the response order by checking exception. Maybe we should wait all in-flight requests/responses are completed and block I/O until re-authentication is done but it means we need refactor |
|
@chia7712 Good find! Could we just reserve a range of correlation ids for SASL requests? 0 to 5 for example for SASL? We can rotate through them since we only expect one request in-flight a time during authentication. And it looks quite easy to change NetworkClient to not use that range. It would then become easy to treat responses with other correlation ids as unrelated to re-auth. |
Nice approach! Let me work on it. |
@rajinisivaram If above comment is true, it seems to me the IllegalStateException caused by mismatched correlation id implies the schema error since there is no "other" SASL requests. In other words, catching IllegalStateException is enough to resolve this issue and we don't need to limit the correlation id for SASL requests. |
|
@chia7712 I think the correlation id sequence for SASL requests are separate from those used in NetworkClient for Kafka requests. So without a reserved range, we can't guarantee that every non-SASL request will throw IllegalStateException due to correlation id mismatch during reauthentication. Unless I misunderstood what you said. |
There was a problem hiding this comment.
@rajinisivaram I change the reserved range from [0, 6] to [1017. 1024] to avoid changing the correlation id in mocked response of other test cases.
There was a problem hiding this comment.
We could perhaps reserve the top numbers with MAX_RESERVED_CORRELATION_ID=Integer.MAX_VALUE? Then we don't need the comment explaining why.
rajinisivaram
left a comment
There was a problem hiding this comment.
@chia7712 Thanks for the update, looks good. Left some minor comments.
There was a problem hiding this comment.
make it if-then-else since we dont't need the increment in the line below? Also split this line since we don't include the statement in the same line as if.
There was a problem hiding this comment.
make it if-then-else since we dont't need the increment in the line below?
The increment make the variable has next available correlation id after evaluating the return value.
Also split this line since we don't include the statement in the same line as if.
Got it
There was a problem hiding this comment.
We could perhaps reserve the top numbers with MAX_RESERVED_CORRELATION_ID=Integer.MAX_VALUE? Then we don't need the comment explaining why.
There was a problem hiding this comment.
as before, this could be if-then-else without the increment below for the if case.
There was a problem hiding this comment.
Does this test hit the issue? If it doesn't reliably test the issue, it may be better to add a unit test since this is expensive and one of the other tests occasionally runs into this issue anyway. If it does reliably test the issue, we should keep this.
There was a problem hiding this comment.
It fails consistently on my local but you are right that it is too expensive. Will update the test case.
There was a problem hiding this comment.
do we expect authorization exception here?
|
retest this please |
rajinisivaram
left a comment
There was a problem hiding this comment.
@chia7712 Thanks for the new tests, look good. Left just a few minor comments.
There was a problem hiding this comment.
I think we were using negative values as well before. Perhaps we should use correlation = SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + 1 to avoid assuming here that zero is not reserved?
There was a problem hiding this comment.
you are totally right! I will roger that!
There was a problem hiding this comment.
We can use assertThrows here.
There was a problem hiding this comment.
As before, we can use assertThrows.
rajinisivaram
left a comment
There was a problem hiding this comment.
@chia7712 Thanks for the updates, LGTM. Will merge after the PR builds complete.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ron Dagostino <rdagostino@confluent.io>
* 'trunk' of github.com:apache/kafka: (28 commits) MINOR: cleanup RocksDBStore tests (apache#8510) KAFKA-9818: Fix flaky test in RecordCollectorTest (apache#8507) MINOR: reduce impact of trace logging in replica hot path (apache#8468) KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence (apache#8475) KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (apache#8501) MINOR: improve test coverage for dynamic LogConfig(s) (apache#7616) MINOR: Switch order of sections on tumbling and hopping windows in streams doc. Tumbling windows are defined as "special case of hopping time windows" - but hopping windows currently only explained later in the docs. (apache#8505) KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (apache#8488) HOTFIX: fix active task process ratio metric recording KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (apache#8448) MINOR: Use streaming iterator with decompression buffer when building offset map (apache#8494) Add log message in release.py (apache#8461) KAFKA-9854 Re-authenticating causes mismatched parse of response (apache#8471) KAFKA-9838; Add log concurrency test and fix minor race condition (apache#8476) KAFKA-9703; Free up compression buffer after splitting a large batch KAFKA-9779: Add Stream system test for 2.5 release (apache#8378) KAFKA-7885: TopologyDescription violates equals-hashCode contract. (apache#6210) MINOR: KafkaApis#handleOffsetDeleteRequest does not group result correctly (apache#8485) HOTFIX: don't close or wipe out someone else's state (apache#8478) MINOR: add process(Test)Messages to the README (apache#8480) ...
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ron Dagostino <rdagostino@confluent.io>
…che#8471) Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ron Dagostino <rdagostino@confluent.io>
the schema of LIST_OFFSETS consists of
If throttle_time is zero and size of responses is small enough, its binary is compatible to schema of SASL_HANDSHAKE which is composed of
Hence, there is no Schema error when SASL_HANDSHAKE tries to parse resoponse of LIST_OFFSETS but the check of correction id still throw IllegalStateException due to mismatched parse. The IllegalStateException is NOT caught and it is not sent back to Selector so the cascading error happens that all following responses are parsed by incorrect Schema.
https://issues.apache.org/jira/browse/KAFKA-9854
Committer Checklist (excluded from commit message)