diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 6d279ac9f4f2b..bba1c43cc3f02 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -214,7 +214,11 @@ SaslClient createSaslClient() { String[] mechs = {mechanism}; log.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}", clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs)); - return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler); + SaslClient retvalSaslClient = Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler); + if (retvalSaslClient == null) { + throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism); + } + return retvalSaslClient; }); } catch (PrivilegedActionException e) { throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism, e.getCause()); diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index b959d68896426..20dbf7b0d9d08 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -193,8 +193,11 @@ private void createSaslServer(String mechanism) throws IOException { try { saslServer = Subject.doAs(subject, (PrivilegedExceptionAction) () -> Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), configs, callbackHandler)); + if (saslServer == null) { + throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + saslMechanism); + } } catch (PrivilegedActionException e) { - throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause()); + throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + saslMechanism, e.getCause()); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index ed922b18f6f36..5c1ce3cfc0653 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -1236,9 +1236,18 @@ public void testInvalidMechanism() throws Exception { saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID"); server = createEchoServer(securityProtocol); - createAndCheckClientConnectionFailure(securityProtocol, node); - server.verifyAuthenticationMetrics(0, 1); - server.verifyReauthenticationMetrics(0, 0); + try { + createAndCheckClientConnectionFailure(securityProtocol, node); + fail("Did not generate exception prior to creating channel"); + } catch (IOException expected) { + server.verifyAuthenticationMetrics(0, 0); + server.verifyReauthenticationMetrics(0, 0); + Throwable underlyingCause = expected.getCause().getCause().getCause(); + assertEquals(SaslAuthenticationException.class, underlyingCause.getClass()); + assertEquals("Failed to create SaslClient with mechanism INVALID", underlyingCause.getMessage()); + } finally { + closeClientConnectionIfNecessary(); + } } /**