From 058e33e569a9c3e7ac5051257eda2a48b825536d Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 20 Feb 2024 08:32:44 +0530 Subject: [PATCH 1/9] failure count server select strategy --- .../druid/client/DirectDruidClient.java | 55 +++++++++++++++--- .../FailureCountServerSelectorStrategy.java | 56 +++++++++++++++++++ .../selector/ServerSelectorStrategy.java | 3 +- 3 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 9a93c8bacd01..392efeb81b3e 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -74,6 +74,7 @@ import java.util.Enumeration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -85,6 +86,7 @@ import java.util.concurrent.atomic.AtomicReference; /** + * */ public class DirectDruidClient implements QueryRunner { @@ -102,6 +104,8 @@ public class DirectDruidClient implements QueryRunner private final ServiceEmitter emitter; private final AtomicInteger openConnections; + private final AtomicInteger failedConnections; + private final BlockingQueue connFailedTimesQueue = new LinkedBlockingQueue<>(); private final boolean isSmile; private final ScheduledExecutorService queryCancellationExecutor; @@ -142,7 +146,11 @@ public DirectDruidClient( this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.openConnections = new AtomicInteger(); + this.failedConnections = new AtomicInteger(); this.queryCancellationExecutor = queryCancellationExecutor; + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(this::flushFailedConnections, 1, 1, TimeUnit.MINUTES); } public int getNumOpenConnections() @@ -150,6 +158,23 @@ public int getNumOpenConnections() return openConnections.get(); } + public int getNumFailedConnections() + { + flushFailedConnections(); + return failedConnections.get(); + } + + private synchronized void flushFailedConnections() + { + long curTime = System.currentTimeMillis(); + while (!connFailedTimesQueue.isEmpty() && curTime > connFailedTimesQueue.peek() + TimeUnit.MINUTES.toMillis(5)) { + connFailedTimesQueue.poll(); + if (failedConnections.get() > 0) { + failedConnections.getAndDecrement(); + } + } + } + @Override public Sequence run(final QueryPlus queryPlus, final ResponseContext context) { @@ -215,7 +240,11 @@ private InputStream dequeue() throws InterruptedException { final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); if (holder == null) { - throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); + throw new QueryTimeoutException(StringUtils.nonStrictFormat( + "Query[%s] url[%s] timed out.", + query.getId(), + url + )); } final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength()); @@ -452,7 +481,11 @@ private void checkTotalBytesLimit(long bytes) long timeLeft = timeoutAt - System.currentTimeMillis(); if (timeLeft <= 0) { - throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); + throw new QueryTimeoutException(StringUtils.nonStrictFormat( + "Query[%s] url[%s] timed out.", + query.getId(), + url + )); } future = httpClient.go( @@ -484,6 +517,8 @@ public void onSuccess(InputStream result) @Override public void onFailure(Throwable t) { + failedConnections.getAndIncrement(); + connFailedTimesQueue.add(System.currentTimeMillis()); openConnections.getAndDecrement(); if (future.isCancelled()) { cancelQuery(query, cancelUrl); @@ -543,10 +578,14 @@ private void cancelQuery(Query query, String cancelUrl) try { Future responseFuture = httpClient.go( new Request(HttpMethod.DELETE, new URL(cancelUrl)) - .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON), + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ), StatusResponseHandler.getInstance(), - Duration.standardSeconds(1)); + Duration.standardSeconds(1) + ); Runnable checkRunnable = () -> { try { @@ -555,10 +594,12 @@ private void cancelQuery(Query query, String cancelUrl) } StatusResponseHolder response = responseFuture.get(30, TimeUnit.SECONDS); if (response.getStatus().getCode() >= 500) { - log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].", + log.error( + "Error cancelling query[%s]: queriable node returned status[%d] [%s].", query, response.getStatus().getCode(), - response.getStatus().getReasonPhrase()); + response.getStatus().getReasonPhrase() + ); } } catch (ExecutionException | InterruptedException e) { diff --git a/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java new file mode 100644 index 000000000000..c2349acee1d1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.selector; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import org.apache.druid.client.DirectDruidClient; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +public class FailureCountServerSelectorStrategy implements ServerSelectorStrategy +{ + private static final Comparator COMPARATOR = + Comparator.comparingInt(s -> { + DirectDruidClient client = (DirectDruidClient) s.getQueryRunner(); + return client.getNumOpenConnections() + client.getNumFailedConnections(); + }); + + @Nullable + @Override + public QueryableDruidServer pick(Set servers, DataSegment segment) + { + return Collections.min(servers, COMPARATOR); + } + + @Override + public List pick(Set servers, DataSegment segment, int numServersToPick) + { + if (servers.size() <= numServersToPick) { + return ImmutableList.copyOf(servers); + } + return Ordering.from(COMPARATOR).leastOf(servers, numServersToPick); + } +} diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java index 587762068c50..b260da6f8a94 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java @@ -32,7 +32,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class), - @JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class) + @JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class), + @JsonSubTypes.Type(name = "failureCount", value = FailureCountServerSelectorStrategy.class) }) public interface ServerSelectorStrategy { From a7e9502d1fa9716905624e1f1d9b4bea5a3f86c9 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 21 Feb 2024 09:53:58 +0530 Subject: [PATCH 2/9] unit test --- .../druid/client/DirectDruidClient.java | 13 ++++++++++++- .../druid/client/DirectDruidClientTest.java | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 392efeb81b3e..b7be88c95411 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -106,6 +106,8 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final AtomicInteger failedConnections; private final BlockingQueue connFailedTimesQueue = new LinkedBlockingQueue<>(); + public static final int DEFAULT_COOLDOWN_TIME_FOR_CONN_FAILURE = 5; + private int cooldownTimeForConnFailure; private final boolean isSmile; private final ScheduledExecutorService queryCancellationExecutor; @@ -149,6 +151,8 @@ public DirectDruidClient( this.failedConnections = new AtomicInteger(); this.queryCancellationExecutor = queryCancellationExecutor; + this.cooldownTimeForConnFailure = DEFAULT_COOLDOWN_TIME_FOR_CONN_FAILURE; + // might remove this scheduler and entirely depend on whenever getNumFailedConnections is invoked ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(this::flushFailedConnections, 1, 1, TimeUnit.MINUTES); } @@ -167,7 +171,9 @@ public int getNumFailedConnections() private synchronized void flushFailedConnections() { long curTime = System.currentTimeMillis(); - while (!connFailedTimesQueue.isEmpty() && curTime > connFailedTimesQueue.peek() + TimeUnit.MINUTES.toMillis(5)) { + while (!connFailedTimesQueue.isEmpty() + && curTime > connFailedTimesQueue.peek() + TimeUnit.MINUTES.toMillis(cooldownTimeForConnFailure) + ) { connFailedTimesQueue.poll(); if (failedConnections.get() > 0) { failedConnections.getAndDecrement(); @@ -175,6 +181,11 @@ private synchronized void flushFailedConnections() } } + public void setCooldownTimeForConnFailure(int cooldownTimeForConnFailure) + { + this.cooldownTimeForConnFailure = cooldownTimeForConnFailure; + } + @Override public Sequence run(final QueryPlus queryPlus, final ResponseContext context) { diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 4c99d2b54097..368be93887ae 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -24,9 +24,11 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy; +import org.apache.druid.client.selector.FailureCountServerSelectorStrategy; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -55,6 +57,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; +import org.joda.time.Minutes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -88,6 +91,7 @@ public class DirectDruidClientTest 0L ); private ServerSelector serverSelector; + private ServerSelector failCountServerSelector; private HttpClient httpClient; private DirectDruidClient client; @@ -97,11 +101,16 @@ public class DirectDruidClientTest @Before public void setup() { + NullHandling.initializeForTests(); httpClient = EasyMock.createMock(HttpClient.class); serverSelector = new ServerSelector( dataSegment, new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) ); + failCountServerSelector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new FailureCountServerSelectorStrategy()) + ); queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); client = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), @@ -113,6 +122,7 @@ public void setup() new NoopServiceEmitter(), queryCancellationExecutor ); + client.setCooldownTimeForConnFailure(1); queryableDruidServer = new QueryableDruidServer( new DruidServer( "test1", @@ -126,6 +136,7 @@ public void setup() client ); serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); + failCountServerSelector.addServerAndUpdateSegment(queryableDruidServer, failCountServerSelector.getSegment()); } @After @@ -199,6 +210,7 @@ public void testRun() throws Exception client2 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); + failCountServerSelector.addServerAndUpdateSegment(queryableDruidServer2, failCountServerSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); @@ -207,12 +219,15 @@ public void testRun() throws Exception Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(1, client.getNumOpenConnections()); + Assert.assertEquals(0, client.getNumFailedConnections()); // simulate read timeout client.run(QueryPlus.wrap(query)); Assert.assertEquals(2, client.getNumOpenConnections()); + Assert.assertEquals(0, client.getNumFailedConnections()); futureException.setException(new ReadTimeoutException()); Assert.assertEquals(1, client.getNumOpenConnections()); + Assert.assertEquals(1, client.getNumFailedConnections()); // subsequent connections should work client.run(QueryPlus.wrap(query)); @@ -231,6 +246,7 @@ public void testRun() throws Exception Assert.assertEquals(1, results.size()); Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); Assert.assertEquals(3, client.getNumOpenConnections()); + Assert.assertEquals(1, client.getNumFailedConnections()); client2.run(QueryPlus.wrap(query)); client2.run(QueryPlus.wrap(query)); @@ -238,8 +254,11 @@ public void testRun() throws Exception Assert.assertEquals(2, client2.getNumOpenConnections()); Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2); + Assert.assertEquals(failCountServerSelector.pick(null), queryableDruidServer2); EasyMock.verify(httpClient); + Thread.sleep(60 * 1000); + Assert.assertEquals(0, client.getNumFailedConnections()); } @Test From d5ea35498cb4decf94baebcb685dac22ce4ea909 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sun, 25 Feb 2024 22:13:42 +0530 Subject: [PATCH 3/9] updated connection count strategy --- .../druid/client/DirectDruidClient.java | 43 ++------------ .../FailureCountServerSelectorStrategy.java | 56 ------------------- .../selector/ServerSelectorStrategy.java | 3 +- .../druid/client/DirectDruidClientTest.java | 17 ------ 4 files changed, 5 insertions(+), 114 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index b7be88c95411..eed51755c79a 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -74,7 +74,6 @@ import java.util.Enumeration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -104,10 +103,6 @@ public class DirectDruidClient implements QueryRunner private final ServiceEmitter emitter; private final AtomicInteger openConnections; - private final AtomicInteger failedConnections; - private final BlockingQueue connFailedTimesQueue = new LinkedBlockingQueue<>(); - public static final int DEFAULT_COOLDOWN_TIME_FOR_CONN_FAILURE = 5; - private int cooldownTimeForConnFailure; private final boolean isSmile; private final ScheduledExecutorService queryCancellationExecutor; @@ -148,13 +143,7 @@ public DirectDruidClient( this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.openConnections = new AtomicInteger(); - this.failedConnections = new AtomicInteger(); this.queryCancellationExecutor = queryCancellationExecutor; - - this.cooldownTimeForConnFailure = DEFAULT_COOLDOWN_TIME_FOR_CONN_FAILURE; - // might remove this scheduler and entirely depend on whenever getNumFailedConnections is invoked - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(this::flushFailedConnections, 1, 1, TimeUnit.MINUTES); } public int getNumOpenConnections() @@ -162,30 +151,6 @@ public int getNumOpenConnections() return openConnections.get(); } - public int getNumFailedConnections() - { - flushFailedConnections(); - return failedConnections.get(); - } - - private synchronized void flushFailedConnections() - { - long curTime = System.currentTimeMillis(); - while (!connFailedTimesQueue.isEmpty() - && curTime > connFailedTimesQueue.peek() + TimeUnit.MINUTES.toMillis(cooldownTimeForConnFailure) - ) { - connFailedTimesQueue.poll(); - if (failedConnections.get() > 0) { - failedConnections.getAndDecrement(); - } - } - } - - public void setCooldownTimeForConnFailure(int cooldownTimeForConnFailure) - { - this.cooldownTimeForConnFailure = cooldownTimeForConnFailure; - } - @Override public Sequence run(final QueryPlus queryPlus, final ResponseContext context) { @@ -499,6 +464,7 @@ private void checkTotalBytesLimit(long bytes) )); } + openConnections.getAndIncrement(); future = httpClient.go( new Request( HttpMethod.POST, @@ -513,8 +479,6 @@ private void checkTotalBytesLimit(long bytes) ); queryWatcher.registerQueryFuture(query, future); - - openConnections.getAndIncrement(); Futures.addCallback( future, new FutureCallback() @@ -528,8 +492,6 @@ public void onSuccess(InputStream result) @Override public void onFailure(Throwable t) { - failedConnections.getAndIncrement(); - connFailedTimesQueue.add(System.currentTimeMillis()); openConnections.getAndDecrement(); if (future.isCancelled()) { cancelQuery(query, cancelUrl); @@ -541,6 +503,9 @@ public void onFailure(Throwable t) ); } catch (IOException e) { + if (openConnections.get() > 0) { + openConnections.getAndDecrement(); + } throw new RuntimeException(e); } diff --git a/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java deleted file mode 100644 index c2349acee1d1..000000000000 --- a/server/src/main/java/org/apache/druid/client/selector/FailureCountServerSelectorStrategy.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client.selector; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; -import org.apache.druid.client.DirectDruidClient; -import org.apache.druid.timeline.DataSegment; - -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Set; - -public class FailureCountServerSelectorStrategy implements ServerSelectorStrategy -{ - private static final Comparator COMPARATOR = - Comparator.comparingInt(s -> { - DirectDruidClient client = (DirectDruidClient) s.getQueryRunner(); - return client.getNumOpenConnections() + client.getNumFailedConnections(); - }); - - @Nullable - @Override - public QueryableDruidServer pick(Set servers, DataSegment segment) - { - return Collections.min(servers, COMPARATOR); - } - - @Override - public List pick(Set servers, DataSegment segment, int numServersToPick) - { - if (servers.size() <= numServersToPick) { - return ImmutableList.copyOf(servers); - } - return Ordering.from(COMPARATOR).leastOf(servers, numServersToPick); - } -} diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java index b260da6f8a94..587762068c50 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java @@ -32,8 +32,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class), - @JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class), - @JsonSubTypes.Type(name = "failureCount", value = FailureCountServerSelectorStrategy.class) + @JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class) }) public interface ServerSelectorStrategy { diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 368be93887ae..19e308e1682f 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy; -import org.apache.druid.client.selector.FailureCountServerSelectorStrategy; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; @@ -57,7 +56,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; -import org.joda.time.Minutes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -91,7 +89,6 @@ public class DirectDruidClientTest 0L ); private ServerSelector serverSelector; - private ServerSelector failCountServerSelector; private HttpClient httpClient; private DirectDruidClient client; @@ -107,10 +104,6 @@ public void setup() dataSegment, new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) ); - failCountServerSelector = new ServerSelector( - dataSegment, - new HighestPriorityTierSelectorStrategy(new FailureCountServerSelectorStrategy()) - ); queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); client = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), @@ -122,7 +115,6 @@ public void setup() new NoopServiceEmitter(), queryCancellationExecutor ); - client.setCooldownTimeForConnFailure(1); queryableDruidServer = new QueryableDruidServer( new DruidServer( "test1", @@ -136,7 +128,6 @@ public void setup() client ); serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); - failCountServerSelector.addServerAndUpdateSegment(queryableDruidServer, failCountServerSelector.getSegment()); } @After @@ -210,7 +201,6 @@ public void testRun() throws Exception client2 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); - failCountServerSelector.addServerAndUpdateSegment(queryableDruidServer2, failCountServerSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); @@ -219,15 +209,12 @@ public void testRun() throws Exception Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(1, client.getNumOpenConnections()); - Assert.assertEquals(0, client.getNumFailedConnections()); // simulate read timeout client.run(QueryPlus.wrap(query)); Assert.assertEquals(2, client.getNumOpenConnections()); - Assert.assertEquals(0, client.getNumFailedConnections()); futureException.setException(new ReadTimeoutException()); Assert.assertEquals(1, client.getNumOpenConnections()); - Assert.assertEquals(1, client.getNumFailedConnections()); // subsequent connections should work client.run(QueryPlus.wrap(query)); @@ -246,7 +233,6 @@ public void testRun() throws Exception Assert.assertEquals(1, results.size()); Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); Assert.assertEquals(3, client.getNumOpenConnections()); - Assert.assertEquals(1, client.getNumFailedConnections()); client2.run(QueryPlus.wrap(query)); client2.run(QueryPlus.wrap(query)); @@ -254,11 +240,8 @@ public void testRun() throws Exception Assert.assertEquals(2, client2.getNumOpenConnections()); Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2); - Assert.assertEquals(failCountServerSelector.pick(null), queryableDruidServer2); EasyMock.verify(httpClient); - Thread.sleep(60 * 1000); - Assert.assertEquals(0, client.getNumFailedConnections()); } @Test From df7aaeeecfe73d62e58dfef75b98d62471751697 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 27 Feb 2024 07:14:21 +0530 Subject: [PATCH 4/9] cleanup --- .../druid/client/DirectDruidClient.java | 4 +- .../druid/client/DirectDruidClientTest.java | 60 +++++++++---------- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index eed51755c79a..cb845451bd9e 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicReference; /** - * */ public class DirectDruidClient implements QueryRunner { @@ -464,7 +463,10 @@ private void checkTotalBytesLimit(long bytes) )); } + // increment is moved up so that if future initialization is queued by some other process, + // we can increment the count earlier so that we can route the request to a different server openConnections.getAndIncrement(); + future = httpClient.go( new Request( HttpMethod.POST, diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 19e308e1682f..04309567797a 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -145,33 +145,33 @@ public void testRun() throws Exception SettableFuture futureResult = SettableFuture.create(); Capture capturedRequest = EasyMock.newCapture(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(futureResult) .times(1); SettableFuture futureException = SettableFuture.create(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(futureException) .times(1); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(SettableFuture.create()) .atLeastOnce(); @@ -252,22 +252,22 @@ public void testCancel() SettableFuture cancellationFuture = SettableFuture.create(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(cancelledFuture) .once(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(cancellationFuture) .anyTimes(); From 1511b632485cf7457c98335b98e75fc4f861e27a Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 27 Feb 2024 08:23:51 +0530 Subject: [PATCH 5/9] Unit Test --- .../druid/client/DirectDruidClient.java | 5 ++- .../druid/client/DirectDruidClientTest.java | 43 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index cb845451bd9e..89d073e27bfa 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -504,10 +504,13 @@ public void onFailure(Throwable t) Execs.directExecutor() ); } - catch (IOException e) { + catch (RuntimeException e) { if (openConnections.get() > 0) { openConnections.getAndDecrement(); } + throw e; + } + catch (IOException e) { throw new RuntimeException(e); } diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 04309567797a..becfebf9c586 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -19,6 +19,8 @@ package org.apache.druid.client; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -429,4 +431,45 @@ public void testQueryTimeoutFromFuture() Assert.assertEquals(hostName, actualException.getHost()); EasyMock.verify(httpClient); } + + @Test + public void testIOException() throws JsonProcessingException + { + ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); + EasyMock.expect(mockObjectMapper.writeValueAsBytes(EasyMock.anyObject())) + .andThrow(new JsonProcessingException("Error"){}); + + DirectDruidClient client2 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + mockObjectMapper, + httpClient, + "http", + hostName, + new NoopServiceEmitter(), + queryCancellationExecutor + ); + + QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( + new DruidServer( + "test1", + "localhost", + null, + 0, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ), + client2 + ); + + serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + + TimeBoundaryQuery finalQuery = query; + Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap(finalQuery))); + Assert.assertEquals(0, client2.getNumOpenConnections()); + } } From d5117db2046ca6eca65fba13d3a1623456ff3378 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 27 Feb 2024 11:44:11 +0530 Subject: [PATCH 6/9] reformat --- .../druid/client/DirectDruidClient.java | 37 +++++++++---------- .../druid/client/DirectDruidClientTest.java | 2 +- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 89d073e27bfa..0cb685c8ef9a 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -466,19 +466,24 @@ private void checkTotalBytesLimit(long bytes) // increment is moved up so that if future initialization is queued by some other process, // we can increment the count earlier so that we can route the request to a different server openConnections.getAndIncrement(); - - future = httpClient.go( - new Request( - HttpMethod.POST, - new URL(url) - ).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft))) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON - ), - responseHandler, - Duration.millis(timeLeft) - ); + try { + future = httpClient.go( + new Request( + HttpMethod.POST, + new URL(url) + ).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft))) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ), + responseHandler, + Duration.millis(timeLeft) + ); + } + catch (RuntimeException e) { + openConnections.getAndDecrement(); + throw e; + } queryWatcher.registerQueryFuture(query, future); Futures.addCallback( @@ -504,12 +509,6 @@ public void onFailure(Throwable t) Execs.directExecutor() ); } - catch (RuntimeException e) { - if (openConnections.get() > 0) { - openConnections.getAndDecrement(); - } - throw e; - } catch (IOException e) { throw new RuntimeException(e); } diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index becfebf9c586..034cd52f1600 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -433,7 +433,7 @@ public void testQueryTimeoutFromFuture() } @Test - public void testIOException() throws JsonProcessingException + public void testRuntimeException() throws JsonProcessingException { ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); EasyMock.expect(mockObjectMapper.writeValueAsBytes(EasyMock.anyObject())) From 8cea4085acd2b6f0832a265dfbb3d8c73156f31a Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 27 Feb 2024 13:29:24 +0530 Subject: [PATCH 7/9] refactor --- .../druid/client/DirectDruidClient.java | 28 ++------ .../druid/client/DirectDruidClientTest.java | 64 ++++++++++--------- 2 files changed, 40 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 0cb685c8ef9a..017e62a74a7c 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -215,11 +215,7 @@ private InputStream dequeue() throws InterruptedException { final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); if (holder == null) { - throw new QueryTimeoutException(StringUtils.nonStrictFormat( - "Query[%s] url[%s] timed out.", - query.getId(), - url - )); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); } final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength()); @@ -456,11 +452,7 @@ private void checkTotalBytesLimit(long bytes) long timeLeft = timeoutAt - System.currentTimeMillis(); if (timeLeft <= 0) { - throw new QueryTimeoutException(StringUtils.nonStrictFormat( - "Query[%s] url[%s] timed out.", - query.getId(), - url - )); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); } // increment is moved up so that if future initialization is queued by some other process, @@ -558,14 +550,10 @@ private void cancelQuery(Query query, String cancelUrl) try { Future responseFuture = httpClient.go( new Request(HttpMethod.DELETE, new URL(cancelUrl)) - .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON - ), + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON), StatusResponseHandler.getInstance(), - Duration.standardSeconds(1) - ); + Duration.standardSeconds(1)); Runnable checkRunnable = () -> { try { @@ -574,12 +562,10 @@ private void cancelQuery(Query query, String cancelUrl) } StatusResponseHolder response = responseFuture.get(30, TimeUnit.SECONDS); if (response.getStatus().getCode() >= 500) { - log.error( - "Error cancelling query[%s]: queriable node returned status[%d] [%s].", + log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].", query, response.getStatus().getCode(), - response.getStatus().getReasonPhrase() - ); + response.getStatus().getReasonPhrase()); } } catch (ExecutionException | InterruptedException e) { diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 034cd52f1600..6c194a096934 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -147,33 +147,33 @@ public void testRun() throws Exception SettableFuture futureResult = SettableFuture.create(); Capture capturedRequest = EasyMock.newCapture(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(futureResult) .times(1); SettableFuture futureException = SettableFuture.create(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(futureException) .times(1); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(SettableFuture.create()) .atLeastOnce(); @@ -254,22 +254,22 @@ public void testCancel() SettableFuture cancellationFuture = SettableFuture.create(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(cancelledFuture) .once(); EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) .andReturn(cancellationFuture) .anyTimes(); @@ -437,7 +437,9 @@ public void testRuntimeException() throws JsonProcessingException { ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); EasyMock.expect(mockObjectMapper.writeValueAsBytes(EasyMock.anyObject())) - .andThrow(new JsonProcessingException("Error"){}); + .andThrow(new JsonProcessingException("Error") + { + }); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), From 57a68d4d0ebd54a0e4e357fa881fef330de051a0 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Mon, 4 Mar 2024 11:33:09 +0530 Subject: [PATCH 8/9] decrement connection count on any exception --- .../main/java/org/apache/druid/client/DirectDruidClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 017e62a74a7c..6a502110058b 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -472,7 +472,7 @@ private void checkTotalBytesLimit(long bytes) Duration.millis(timeLeft) ); } - catch (RuntimeException e) { + catch (Exception e) { openConnections.getAndDecrement(); throw e; } From b6cef59403fbcdd224d2e84b006d90fafb5f6afe Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Mon, 4 Mar 2024 12:33:04 +0530 Subject: [PATCH 9/9] minor refactor --- .../java/org/apache/druid/client/DirectDruidClientTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 6c194a096934..4fffcd6fd358 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerTestHelper; @@ -433,10 +434,10 @@ public void testQueryTimeoutFromFuture() } @Test - public void testRuntimeException() throws JsonProcessingException + public void testConnectionCountAfterException() throws JsonProcessingException { ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); - EasyMock.expect(mockObjectMapper.writeValueAsBytes(EasyMock.anyObject())) + EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class)) .andThrow(new JsonProcessingException("Error") { });