Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;

Expand All @@ -29,6 +30,7 @@
public class ApiVersionsResponse extends AbstractRequestResponse {

private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();

public static final String ERROR_CODE_KEY_NAME = "error_code";
public static final String API_VERSIONS_KEY_NAME = "api_versions";
Expand Down Expand Up @@ -106,6 +108,18 @@ public static ApiVersionsResponse fromError(Errors error) {
return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList());
}

public static ApiVersionsResponse apiVersionsResponse() {
return API_VERSIONS_RESPONSE;
}

private static ApiVersionsResponse createApiVersionsResponse() {
List<ApiVersion> versionList = new ArrayList<>();
for (ApiKeys apiKey : ApiKeys.values()) {
versionList.add(new ApiVersion(apiKey.id, Protocol.MIN_VERSIONS[apiKey.id], Protocol.CURR_VERSION[apiKey.id]));
}
return new ApiVersionsResponse(Errors.NONE.code(), versionList);
}

private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
for (ApiVersion apiVersion: apiVersions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.ResponseSend;
Expand Down Expand Up @@ -290,7 +292,11 @@ private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, Auth
isKafkaRequest = true;

ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
LOG.debug("Handle Kafka request {}", apiKey);
switch (apiKey) {
case API_VERSIONS:
handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One impact of this implementation is that we allow a client to send any number of ApiVersion requests before the SaslHandshake request. It doesn't seem to hurt. However, it seems that requiring that a client issue a SaslHandshake request immediately after ApiVersion request probably makes the protocol simpler and easier to understand?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there isnt a strong reason to constrain the semantics to max one request prior to handshake I dont think we should.
In my view adding constraints like that actually make the protocol less simple, but it might make the broker implementation more simple. From the client's point of view a less strict protocol is easier to work with.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it will complicate the implementation quite a bit. Count requests per connection? We don't do it for any other request type...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker implementation is more complicated, but not too complicated, we'd have to update SaslState or something like that. However, I also think that adding this restriction doesn't necessarily make things simpler overall (I can see Jun's point, but also Magnus's), so I'm +0 on leaving as is given that any change at this stage adds some risk.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill : Is there a use case for a client to ask for ApiVersionRequest more than once before sasl handshake? I thought a more strict protocol is easier for a client to deal with since there are few code paths that a client has to test.

@gwenshap : Yes, the implementation on the server side will be a bit more complicated to restrict the number of times ApiVersionRequest is issued before SaslHandshakeRequest, but not too much. The requests before authentication are already treated specially anyway. My main concern is on documenting the protocol. It's simpler if the protocol is

ApiVersionRequest? SaslHandShakeRequest SaslTokens

instead of

ApiVersionRequest* SaslHandShakeRequest SaslTokens

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram : Yes, if you could submit a followup patch on this, that will be great. Thanks,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the case where we add a new version of ApiVersionsRequest. In that case, a client could issue an ApiVersionsRequest with a newer version and a 0.10 broker would reply with a response containing an error code. The client would then issue an ApiVersionsRequest with version 0.

We are not taking advantage of this yet, but it's something we discussed. For clients, it's fine to add this support later, but we can't do that for brokers so they need the flexibility from the start.

Or am I missing something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very good point. To support that, we can allow multiple ApiVersionRequests before SaslHandshakeRequest as it is. However, we need to fix a few things in the code.

  1. We handle unsupported version of ApiVersionRequest in KafkaApis.handleApiVersionsRequest(). That's too late. In RequestChannel, we have the generic logic to throw an exception on unknown version of a request which will kill the connection.
  2. In SaslServerAuthenticator.handleKafkaRequest(), when receiving an unknown version of ApiVersionRequest, we throw an UnsupportedSaslMechanismException which kills the connection. We should send an error response in this case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good @junrao. @rajinisivaram, maybe you can add some tests to verify the desired behaviour as described by Jun along with the code changes? Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao @ijuma Implementation changes with unit tests are in the PR #1310

case SASL_HANDSHAKE:
clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
break;
Expand Down Expand Up @@ -336,6 +342,10 @@ private String handleHandshakeRequest(RequestHeader requestHeader, SaslHandshake
}
}

private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
}

private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException {
ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ public static Selector createSelector(ChannelBuilder channelBuilder) {

public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception {

waitForChannelReady(selector, node);
String prefix = TestUtils.randomString(minMessageSize);
int requests = 0;
int responses = 0;
// wait for handshake to finish
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
requests++;
while (responses < messageCount) {
Expand All @@ -66,6 +63,15 @@ public static void checkClientConnection(Selector selector, String node, int min
}
}

public static void waitForChannelReady(Selector selector, String node) throws IOException {
// wait for handshake to finish
int secondsLeft = 30;
while (!selector.isChannelReady(node) && secondsLeft-- > 0) {
selector.poll(1000L);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check that the channel is ready once we exit the while loop and if it is not, throw an exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assertTrue(selector.isChannelReady(node));
}

public static void waitForChannelClose(Selector selector, String node) throws IOException {
boolean closed = false;
for (int i = 0; i < 30; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.CertStores;
Expand All @@ -34,11 +37,18 @@
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequestResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -209,6 +219,30 @@ public void testMultipleServerMechanisms() throws Exception {
NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
}

/**
* Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
* prior to SASL handshake flow and that subsequent authentication succeeds
* when transport layer is PLAINTEXT. This test simulates SASL authentication using a
* (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after
* connection to the server is established, before any SASL-related packets are sent.
*/
@Test
public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception {
testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT);
}

/**
* Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
* prior to SASL handshake flow and that subsequent authentication succeeds
* when transport layer is SSL. This test simulates SASL authentication using a
* (non-SASL) SSL client and sends ApiVersionsRequest straight after
* SSL handshake, before any SASL-related packets are sent.
*/
@Test
public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception {
testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL);
}

/**
* Tests that any invalid data during Kafka SASL handshake request flow
* or the actual SASL authentication flow result in authentication failure
Expand All @@ -223,7 +257,7 @@ public void testInvalidSaslPacket() throws Exception {
// Send invalid SASL packet after valid handshake request
String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
sendHandshakeRequest(node1);
sendHandshakeRequestReceiveResponse(node1);
Random random = new Random();
byte[] bytes = new byte[1024];
random.nextBytes(bytes);
Expand All @@ -246,6 +280,33 @@ public void testInvalidSaslPacket() throws Exception {
createAndCheckClientConnection(securityProtocol, "good2");
}

/**
* Tests that ApiVersionsRequest after Kafka SASL handshake request flow,
* but prior to actual SASL authentication, results in authentication failure.
* This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
* where a non-SASL client is used to send requests that are processed by
* {@link SaslServerAuthenticator} of the server prior to client authentication.
*/
@Test
public void testInvalidApiVersionsRequestSequence() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);

// Send handshake request followed by ApiVersionsRequest
String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
sendHandshakeRequestReceiveResponse(node1);

RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2);
selector.send(new NetworkSend(node1, RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct())));
NetworkTestUtils.waitForChannelClose(selector, node1);
selector.close();

// Test good connection still works
createAndCheckClientConnection(securityProtocol, "good1");
}

/**
* Tests that packets that are too big during Kafka SASL handshake request flow
* or the actual SASL authentication flow result in authentication failure
Expand All @@ -260,7 +321,7 @@ public void testPacketSizeTooBig() throws Exception {
// Send SASL packet with large size after valid handshake request
String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
sendHandshakeRequest(node1);
sendHandshakeRequestReceiveResponse(node1);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.putInt(Integer.MAX_VALUE);
buffer.put(new byte[buffer.capacity() - 4]);
Expand Down Expand Up @@ -312,7 +373,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
// Send metadata request after Kafka SASL handshake request
String node2 = "invalid2";
createClientConnection(SecurityProtocol.PLAINTEXT, node2);
sendHandshakeRequest(node2);
sendHandshakeRequestReceiveResponse(node2);
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2);
MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic"));
selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct())));
Expand Down Expand Up @@ -371,6 +432,68 @@ public void testInvalidMechanism() throws Exception {
NetworkTestUtils.waitForChannelClose(selector, node);
}

/**
* Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
* prior to SASL handshake flow and that subsequent authentication succeeds
* when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates
* SASL authentication after ApiVersionsRequest.
* <p>
* Test sequence (using <tt>securityProtocol=PLAINTEXT</tt> as an example):
* <ol>
* <li>Starts a SASL_PLAINTEXT test server that simply echoes back client requests after authentication.</li>
* <li>A (non-SASL) PLAINTEXT test client connects to the SASL server port. Client is now unauthenticated.<./li>
* <li>The unauthenticated non-SASL client sends an ApiVersionsRequest and validates the response.
* A valid response indicates that {@link SaslServerAuthenticator} of the test server responded to
* the ApiVersionsRequest even though the client is not yet authenticated.</li>
* <li>The unauthenticated non-SASL client sends a SaslHandshakeRequest and validates the response. A valid response
* indicates that {@link SaslServerAuthenticator} of the test server responded to the SaslHandshakeRequest
* after processing ApiVersionsRequest.</li>
* <li>The unauthenticated non-SASL client sends the SASL/PLAIN packet containing username/password to authenticate
* itself. The client is now authenticated by the server. At this point this test client is at the
* same state as a regular SASL_PLAINTEXT client that is <tt>ready</tt>.</li>
* <li>The authenticated client sends random data to the server and checks that the data is echoed
* back by the test server (ie, not Kafka request-response) to ensure that the client now
* behaves exactly as a regular SASL_PLAINTEXT client that has completed authentication.</li>
* </ol>
*/
private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception {
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);

// Create non-SASL connection to manually authenticate after ApiVersionsRequest
String node = "1";
SecurityProtocol clientProtocol;
switch (securityProtocol) {
case SASL_PLAINTEXT:
clientProtocol = SecurityProtocol.PLAINTEXT;
break;
case SASL_SSL:
clientProtocol = SecurityProtocol.SSL;
break;
default:
throw new IllegalArgumentException("Server protocol " + securityProtocol + " is not SASL");
}
createClientConnection(clientProtocol, node);
NetworkTestUtils.waitForChannelReady(selector, node);

// Send ApiVersionsRequest and check response
ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node);
assertEquals(Protocol.MIN_VERSIONS[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
assertEquals(Protocol.CURR_VERSION[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);

// Send SaslHandshakeRequest and check response
SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());

// Authenticate using PLAIN username/password
String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));
waitForResponse();

// Check send/receive on the manually authenticated connection
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}

private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) {
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms);
Expand All @@ -396,13 +519,35 @@ private void createAndCheckClientConnection(SecurityProtocol securityProtocol, S
selector = null;
}

private void sendHandshakeRequest(String node) throws Exception {
RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, "someclient", 1);
private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequestResponse request) throws IOException {
RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
selector.send(new NetworkSend(node, RequestSend.serialize(header, request.toStruct())));
ByteBuffer responseBuffer = waitForResponse();
return NetworkClient.parseResponse(responseBuffer, header);
}

private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception {
SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
selector.send(new NetworkSend(node, RequestSend.serialize(header, handshakeRequest.toStruct())));
Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
SaslHandshakeResponse response = new SaslHandshakeResponse(responseStruct);
assertEquals(Errors.NONE.code(), response.errorCode());
return response;
}

private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
ApiVersionsRequest handshakeRequest = new ApiVersionsRequest();
Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
ApiVersionsResponse response = new ApiVersionsResponse(responseStruct);
assertEquals(Errors.NONE.code(), response.errorCode());
return response;
}

private ByteBuffer waitForResponse() throws IOException {
int waitSeconds = 10;
do {
selector.poll(1000);
} while (selector.completedSends().isEmpty() && waitSeconds-- > 0);
} while (selector.completedReceives().isEmpty() && waitSeconds-- > 0);
assertEquals(1, selector.completedReceives().size());
return selector.completedReceives().get(0).payload();
}
}
12 changes: 1 addition & 11 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@ import scala.collection._
import scala.collection.JavaConverters._
import org.apache.kafka.common.requests.SaslHandshakeResponse

object KafkaApis {
val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava)

private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = {
ApiKeys.values.map(apiKey =>
apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id), Protocol.CURR_VERSION(apiKey.id))).toMap
}
}


/**
* Logic to handle the various Kafka requests
*/
Expand Down Expand Up @@ -1041,7 +1031,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) &&
request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
val responseBody = if (isApiVersionsRequestVersionSupported)
KafkaApis.apiVersionsResponse
ApiVersionsResponse.apiVersionsResponse
else
ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
Expand Down
Loading