diff --git a/examples/bin/dsql-main b/examples/bin/dsql-main index 8dfe8823a68c..cf685810bf3d 100755 --- a/examples/bin/dsql-main +++ b/examples/bin/dsql-main @@ -400,7 +400,7 @@ def main(): parser_fmt.add_argument('--format', type=str, default='table', choices=('csv', 'tsv', 'json', 'table'), help='Result format') parser_fmt.add_argument('--header', action='store_true', help='Include header row for formats "csv" and "tsv"') parser_fmt.add_argument('--tsv-delimiter', type=str, default='\t', help='Delimiter for format "tsv"') - parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://docs.imply.io/on-prem/query-data/sql for options') + parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://druid.apache.org/docs/latest/querying/sql.html#connection-context for options') parser_oth.add_argument('--execute', '-e', type=str, help='Execute single SQL query') args = parser.parse_args() diff --git a/integration-tests/docker/tls/generate-good-client-cert.sh b/integration-tests/docker/tls/generate-good-client-cert.sh index 895e6c34bad8..0f16c1449c5c 100755 --- a/integration-tests/docker/tls/generate-good-client-cert.sh +++ b/integration-tests/docker/tls/generate-good-client-cert.sh @@ -58,5 +58,5 @@ openssl x509 -req -days 3650 -in client.csr -CA root.pem -CAkey root.key -set_se openssl pkcs12 -export -in client.pem -inkey client.key -out client.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123 keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123 -# Create a Java truststore with the imply test cluster root CA +# Create a Java truststore with the druid test cluster root CA keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt diff --git a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh index 8f38be303a8d..e26cdac40887 100755 --- a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh +++ b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh @@ -63,7 +63,7 @@ openssl x509 -req -days 3650 -in server.csr -CA root.pem -CAkey root.key -set_se openssl pkcs12 -export -in server.pem -inkey server.key -out server.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123 keytool -importkeystore -srckeystore server.p12 -srcstoretype PKCS12 -destkeystore server.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123 -# Create a Java truststore with the imply test cluster root CA +# Create a Java truststore with the druid test cluster root CA keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt # Revoke one of the client certs diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index bbb80e49aaf7..fc6f93ed6532 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import com.google.inject.Binder; @@ -345,7 +346,13 @@ static Server makeAndInitializeServer( List monitoredConnFactories = new ArrayList<>(); for (ConnectionFactory cf : connector.getConnectionFactories()) { - monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS)); + // we only want to monitor the first connection factory, since it will pass the connection to subsequent + // connection factories (in this case HTTP/1.1 after the connection is unencrypted for SSL) + if (cf.getProtocol().equals(connector.getDefaultProtocol())) { + monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS)); + } else { + monitoredConnFactories.add(cf); + } } connector.setConnectionFactories(monitoredConnFactories); } @@ -531,4 +538,10 @@ protected TrustManager[] getTrustManagers( return newTrustManagers; } } + + @VisibleForTesting + public int getActiveConnections() + { + return ACTIVE_CONNECTIONS.get(); + } } diff --git a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java index defe2de902a8..e38865a1a3ff 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server.initialization; +import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.servlet.GuiceFilter; @@ -60,6 +61,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.zip.Deflater; @@ -72,6 +74,7 @@ public abstract class BaseJettyTest protected HttpClient client; protected Server server; protected int port = -1; + protected int tlsPort = -1; public static void setProperties() { @@ -87,6 +90,8 @@ public void setup() throws Exception Injector injector = setupInjector(); final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class)); port = node.getPlaintextPort(); + tlsPort = node.getTlsPort(); + lifecycle = injector.getInstance(Lifecycle.class); lifecycle.start(); ClientHolder holder = injector.getInstance(ClientHolder.class); @@ -175,6 +180,71 @@ public Response hello() } } + @Path("/latched") + public static class LatchedResource + { + private final LatchedRequestStateHolder state; + + @Inject + public LatchedResource(LatchedRequestStateHolder state) + { + this.state = state; + } + + @GET + @Path("/hello") + @Produces(MediaType.APPLICATION_JSON) + public Response hello() + { + state.serverStartRequest(); + try { + state.serverWaitForClientReadyToFinishRequest(); + } + catch (InterruptedException ignored) { + } + return Response.ok(DEFAULT_RESPONSE_CONTENT).build(); + } + } + + public static class LatchedRequestStateHolder + { + private static final int TIMEOUT_MILLIS = 10_000; + + private CountDownLatch requestStartLatch; + private CountDownLatch requestEndLatch; + + public LatchedRequestStateHolder() + { + reset(); + } + + public void reset() + { + requestStartLatch = new CountDownLatch(1); + requestEndLatch = new CountDownLatch(1); + } + + public void clientWaitForServerToStartRequest() throws InterruptedException + { + requestStartLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } + + public void serverStartRequest() + { + requestStartLatch.countDown(); + } + + public void serverWaitForClientReadyToFinishRequest() throws InterruptedException + { + requestEndLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } + + public void clientReadyToFinishRequest() + { + requestEndLatch.countDown(); + } + } + @Path("/default") public static class DefaultResource { diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java index 57608055ad4b..64ccd1487699 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java @@ -34,26 +34,35 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.HttpClientConfig; +import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.initialization.jetty.JettyServerModule; import org.apache.druid.server.initialization.jetty.ServletFilterHolder; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.eclipse.jetty.server.Server; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import javax.servlet.DispatcherType; import javax.servlet.Filter; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; @@ -61,6 +70,8 @@ import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.EnumSet; import java.util.Locale; import java.util.Map; @@ -74,10 +85,109 @@ public class JettyTest extends BaseJettyTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private HttpClientConfig sslConfig; + + private Injector injector; + + private LatchedRequestStateHolder latchedRequestState; + @Override protected Injector setupInjector() { - return Initialization.makeInjectorWithModules( + TLSServerConfig tlsConfig; + try { + File keyStore = new File(JettyTest.class.getClassLoader().getResource("server.jks").getFile()); + Path tmpKeyStore = Files.copy(keyStore.toPath(), new File(folder.newFolder(), "server.jks").toPath()); + File trustStore = new File(JettyTest.class.getClassLoader().getResource("truststore.jks").getFile()); + Path tmpTrustStore = Files.copy(trustStore.toPath(), new File(folder.newFolder(), "truststore.jks").toPath()); + PasswordProvider pp = () -> "druid123"; + tlsConfig = new TLSServerConfig() + { + @Override + public String getKeyStorePath() + { + return tmpKeyStore.toString(); + } + + @Override + public String getKeyStoreType() + { + return "jks"; + } + + @Override + public PasswordProvider getKeyStorePasswordProvider() + { + return pp; + } + + @Override + public PasswordProvider getKeyManagerPasswordProvider() + { + return pp; + } + + @Override + public String getTrustStorePath() + { + return tmpTrustStore.toString(); + } + + @Override + public String getTrustStoreAlgorithm() + { + return "PKIX"; + } + + @Override + public PasswordProvider getTrustStorePasswordProvider() + { + return pp; + } + + @Override + public String getCertAlias() + { + return "druid"; + } + + @Override + public boolean isRequireClientCertificate() + { + return false; + } + + @Override + public boolean isRequestClientCertificate() + { + return false; + } + + @Override + public boolean isValidateHostnames() + { + return false; + } + }; + + sslConfig = + HttpClientConfig.builder() + .withSslContext( + HttpClientInit.sslContextWithTrustedKeyStore(tmpTrustStore.toString(), pp.getPassword()) + ) + .withWorkerCount(1) + .withReadTimeout(Duration.ZERO) + .build(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + latchedRequestState = new LatchedRequestStateHolder(); + injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of( new Module() @@ -88,9 +198,11 @@ public void configure(Binder binder) JsonConfigProvider.bindInstance( binder, Key.get(DruidNode.class, Self.class), - new DruidNode("test", "localhost", false, null, null, true, false) + new DruidNode("test", "localhost", false, 9988, 9999, true, true) ); + binder.bind(TLSServerConfig.class).toInstance(tlsConfig); binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class); + binder.bind(LatchedRequestStateHolder.class).toInstance(latchedRequestState); Multibinder multibinder = Multibinder.newSetBinder( binder, @@ -132,7 +244,9 @@ public EnumSet getDispatcherType() } ); + Jerseys.addResource(binder, SlowResource.class); + Jerseys.addResource(binder, LatchedResource.class); Jerseys.addResource(binder, ExceptionResource.class); Jerseys.addResource(binder, DefaultResource.class); Jerseys.addResource(binder, DirectlyReturnResource.class); @@ -143,6 +257,7 @@ public EnumSet getDispatcherType() ) ); + return injector; } @Test @@ -209,13 +324,19 @@ public void testGzipResponseCompression() throws Exception final HttpURLConnection get = (HttpURLConnection) url.openConnection(); get.setRequestProperty("Accept-Encoding", "gzip"); Assert.assertEquals("gzip", get.getContentEncoding()); - Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8)); + Assert.assertEquals( + DEFAULT_RESPONSE_CONTENT, + IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8) + ); final HttpURLConnection post = (HttpURLConnection) url.openConnection(); post.setRequestProperty("Accept-Encoding", "gzip"); post.setRequestMethod("POST"); Assert.assertEquals("gzip", post.getContentEncoding()); - Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8)); + Assert.assertEquals( + DEFAULT_RESPONSE_CONTENT, + IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8) + ); final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection(); Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding()); @@ -224,7 +345,10 @@ public void testGzipResponseCompression() throws Exception final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection(); postNoGzip.setRequestMethod("POST"); Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding()); - Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8)); + Assert.assertEquals( + DEFAULT_RESPONSE_CONTENT, + IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8) + ); } // Tests that threads are not stuck when partial chunk is not finalized @@ -311,4 +435,81 @@ public void testGzipRequestDecompression() throws Exception new InputStreamResponseHandler() ).get()), Charset.defaultCharset())); } + + @Test + public void testNumConnectionsMetricHttp() throws Exception + { + String text = "hello"; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) { + gzipOutputStream.write(text.getBytes(Charset.defaultCharset())); + } + Request request = new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/latched/hello")); + request.setHeader("Content-Encoding", "gzip"); + request.setContent(MediaType.TEXT_PLAIN, out.toByteArray()); + + JettyServerModule jsm = injector.getInstance(JettyServerModule.class); + latchedRequestState.reset(); + + Assert.assertEquals(0, jsm.getActiveConnections()); + ListenableFuture go = client.go( + request, + new InputStreamResponseHandler() + ); + latchedRequestState.clientWaitForServerToStartRequest(); + Assert.assertEquals(1, jsm.getActiveConnections()); + latchedRequestState.clientReadyToFinishRequest(); + go.get(); + waitForJettyServerModuleActiveConnectionsZero(jsm); + Assert.assertEquals(0, jsm.getActiveConnections()); + } + + @Test + public void testNumConnectionsMetricHttps() throws Exception + { + String text = "hello"; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) { + gzipOutputStream.write(text.getBytes(Charset.defaultCharset())); + } + Request request = new Request(HttpMethod.GET, new URL("https://localhost:" + tlsPort + "/latched/hello")); + request.setHeader("Content-Encoding", "gzip"); + request.setContent(MediaType.TEXT_PLAIN, out.toByteArray()); + HttpClient client; + try { + client = HttpClientInit.createClient( + sslConfig, + lifecycle + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + JettyServerModule jsm = injector.getInstance(JettyServerModule.class); + latchedRequestState.reset(); + + Assert.assertEquals(0, jsm.getActiveConnections()); + ListenableFuture go = client.go( + request, + new InputStreamResponseHandler() + ); + latchedRequestState.clientWaitForServerToStartRequest(); + Assert.assertEquals(1, jsm.getActiveConnections()); + latchedRequestState.clientReadyToFinishRequest(); + go.get(); + waitForJettyServerModuleActiveConnectionsZero(jsm); + Assert.assertEquals(0, jsm.getActiveConnections()); + } + + private void waitForJettyServerModuleActiveConnectionsZero(JettyServerModule jsm) throws InterruptedException + { + // it can take a bit to close the connection, so maybe sleep for a while and hope it closes + final int sleepTimeMills = 10; + final int totalSleeps = 5_000 / sleepTimeMills; + int count = 0; + while (jsm.getActiveConnections() > 0 && count++ < totalSleeps) { + Thread.sleep(sleepTimeMills); + } + } } diff --git a/server/src/test/resources/server.jks b/server/src/test/resources/server.jks new file mode 100644 index 000000000000..664e3754cc46 Binary files /dev/null and b/server/src/test/resources/server.jks differ diff --git a/server/src/test/resources/truststore.jks b/server/src/test/resources/truststore.jks new file mode 100644 index 000000000000..3cd268c86651 Binary files /dev/null and b/server/src/test/resources/truststore.jks differ