diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceClient.java b/server/src/main/java/org/apache/druid/rpc/ServiceClient.java index cb33f713d988..1c416f88a1dc 100644 --- a/server/src/main/java/org/apache/druid/rpc/ServiceClient.java +++ b/server/src/main/java/org/apache/druid/rpc/ServiceClient.java @@ -49,8 +49,10 @@ public interface ServiceClient * encountered error. * * Redirects from 3xx responses are followed up to a chain length of {@link #MAX_REDIRECTS} and do not consume - * attempts. Redirects are validated against the targets returned by {@link ServiceLocator}: the client will not - * follow a redirect to a target that does not appear in the returned {@link ServiceLocations}. + * attempts. Redirects are validated against the targets returned by {@link ServiceLocator}: the client will only + * follow redirects to targets that appear in {@link ServiceLocations}. If the client encounters a redirect to an + * unknown target, or if a redirect loop or self-redirect is detected, it is treated as an unavailable service and + * an attempt is consumed. * * If the service is unavailable at the time an attempt is made, the client will automatically retry based on * {@link ServiceRetryPolicy#retryNotAvailable()}. If true, an attempt is consumed and the client will try to locate diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java index 1445b943e96e..eca2cfdc5a2e 100644 --- a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -88,7 +89,7 @@ public ListenableFuture asyncRequest( ) { final SettableFuture retVal = SettableFuture.create(); - tryRequest(requestBuilder, handler, retVal, 0, 0); + tryRequest(requestBuilder, handler, retVal, 0, ImmutableSet.of()); return retVal; } @@ -98,21 +99,35 @@ public ServiceClientImpl withRetryPolicy(ServiceRetryPolicy newRetryPolicy) return new ServiceClientImpl(serviceName, httpClient, serviceLocator, newRetryPolicy, connectExec); } + /** + * Internal helper used by {@link #asyncRequest(RequestBuilder, HttpResponseHandler)}. + * + * Handles retries by calling itself back in {@link #connectExec} with an incremented {@code attemptNumber}. + * + * @param requestBuilder request builder from call to {@link #asyncRequest} + * @param handler handler from call to {@link #asyncRequest} + * @param retVal return future generated by {@link #asyncRequest} + * @param attemptNumber attempt number; starts at 0 and is incremented on each retry + * @param redirectLocations redirect locations observed from the server on this attempt; used for detecting redirect + * loops and for limiting redirect chain length to {@link #MAX_REDIRECTS}. Cleared when + * a new attempt is issued. + */ private void tryRequest( final RequestBuilder requestBuilder, final HttpResponseHandler handler, final SettableFuture retVal, final long attemptNumber, - final int redirectCount + final ImmutableSet redirectLocations ) { whenServiceReady( - serviceLocation -> { + serviceLocations -> { if (retVal.isCancelled()) { // Return early if the caller canceled the return future. return; } + final ServiceLocation serviceLocation = pick(serviceLocations); final long nextAttemptNumber = attemptNumber + 1; if (serviceLocation == null) { @@ -128,7 +143,7 @@ private void tryRequest( ); connectExec.schedule( - () -> tryRequest(requestBuilder, handler, retVal, attemptNumber + 1, redirectCount), + () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()), backoffMs, TimeUnit.MILLISECONDS ); @@ -172,71 +187,15 @@ public void onSuccess(@Nullable final Either 1) { - // There were retries. Log at INFO level to provide the user some closure. - log.info( - "Service [%s] request [%s %s] completed.", - serviceName, - request.getMethod(), - request.getUrl() - ); - } else { - // No retries. Log at debug level to avoid cluttering the logs. - log.debug( - "Service [%s] request [%s %s] completed.", - serviceName, - request.getMethod(), - request.getUrl() - ); - } - - // Will not throw, because we checked result.isValue() earlier. - retVal.set(result.valueOrThrow()); + handleResultValue(result.valueOrThrow()); } else { final StringFullResponseHolder errorHolder = result != null ? result.error() : null; if (errorHolder != null && isRedirect(errorHolder.getResponse().getStatus())) { - // Redirect. Update preferredLocationNoPath if appropriate, then reissue. - final String newUri = result.error().getResponse().headers().get("Location"); - - if (redirectCount >= MAX_REDIRECTS) { - retVal.setException(new RpcException( - "Service [%s] redirected too many times [%d] to invalid url %s", - serviceName, - redirectCount, - newUri - )); - } else { - // Update preferredLocationNoPath if we got a redirect. - final ServiceLocation redirectLocationNoPath = serviceLocationNoPathFromUri(newUri); - - if (redirectLocationNoPath != null) { - preferredLocationNoPath.set(redirectLocationNoPath); - connectExec.submit( - () -> tryRequest(requestBuilder, handler, retVal, attemptNumber, redirectCount + 1) - ); - } else { - retVal.setException( - new RpcException( - "Service [%s] redirected [%d] times to invalid URL [%s]", - serviceName, - redirectCount, - newUri - ) - ); - } - } + handleRedirect(errorHolder); } else if (shouldTry(nextAttemptNumber) && (errorHolder == null || retryPolicy.retryHttpResponse(errorHolder.getResponse()))) { - // Retryable server response (or null errorHolder, which means null result, which can happen - // if the HttpClient encounters an exception in the midst of response processing). - final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber); - log.noStackTrace().info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber)); - connectExec.schedule( - () -> tryRequest(requestBuilder, handler, retVal, attemptNumber + 1, redirectCount), - backoffMs, - TimeUnit.MILLISECONDS - ); + handleRetryableErrorResponse(errorHolder); } else if (errorHolder != null) { // Nonretryable server response. retVal.setException(new HttpResponseException(errorHolder)); @@ -264,7 +223,7 @@ public void onFailure(final Throwable t) log.noStackTrace().info(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber)); connectExec.schedule( - () -> tryRequest(requestBuilder, handler, retVal, attemptNumber + 1, redirectCount), + () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()), backoffMs, TimeUnit.MILLISECONDS ); @@ -277,6 +236,135 @@ public void onFailure(final Throwable t) retVal.setException(new RpcException(t, "Service [%s] handler exited unexpectedly", serviceName)); } } + + /** + * Handles HTTP 2xx responses from the server. + */ + private void handleResultValue(final FinalType value) + { + if (nextAttemptNumber > 1) { + // There were retries. Log at INFO level to provide the user some closure. + log.info( + "Service [%s] request [%s %s] completed.", + serviceName, + request.getMethod(), + request.getUrl() + ); + } else { + // No retries. Log at debug level to avoid cluttering the logs. + log.debug( + "Service [%s] request [%s %s] completed.", + serviceName, + request.getMethod(), + request.getUrl() + ); + } + + // Will not throw, because we checked result.isValue() earlier. + retVal.set(value); + } + + /** + * Handles retryable HTTP error responses from the server. + */ + private void handleRetryableErrorResponse(final StringFullResponseHolder errorHolder) + { + // Retryable server response (or null errorHolder, which means null result, which can happen + // if the HttpClient encounters an exception in the midst of response processing). + final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber); + log.info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber)); + connectExec.schedule( + () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()), + backoffMs, + TimeUnit.MILLISECONDS + ); + } + + /** + * Handles HTTP redirect responses from the server. + */ + private void handleRedirect(final StringFullResponseHolder errorHolder) + { + // Redirect. Update preferredLocationNoPath if appropriate, then reissue. + final String newUri = errorHolder.getResponse().headers().get("Location"); + final ServiceLocation redirectLocationNoPath = serviceLocationNoPathFromUri(newUri); + + if (redirectLocationNoPath == null) { + // Redirect to invalid URL. Something is wrong with the server: fail immediately + // without retries. + retVal.setException( + new RpcException( + "Service [%s] redirected to invalid URL [%s]", + serviceName, + newUri + ) + ); + } else if (serviceLocations.getLocations() + .stream() + .anyMatch(loc -> serviceLocationNoPath(loc) + .equals(redirectLocationNoPath))) { + // Valid redirect, to a server that is one of the known locations. + final boolean isRedirectLoop = redirectLocations.contains(newUri); + final boolean isRedirectChainTooLong = redirectLocations.size() >= MAX_REDIRECTS; + + if (isRedirectLoop || isRedirectChainTooLong) { + // Treat redirect loops, or too-long redirect chains, as unavailable services. + if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) { + final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber); + + log.info( + "Service [%s] issued too many redirects on attempt #%d; retrying in %,d ms.", + serviceName, + nextAttemptNumber, + backoffMs + ); + + connectExec.schedule( + () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()), + backoffMs, + TimeUnit.MILLISECONDS + ); + } else { + retVal.setException(new ServiceNotAvailableException(serviceName, "issued too many redirects")); + } + } else { + // Valid redirect. Follow it without incrementing the attempt number. + preferredLocationNoPath.set(redirectLocationNoPath); + final ImmutableSet newRedirectLocations = + ImmutableSet.builder().addAll(redirectLocations).add(newUri).build(); + connectExec.submit( + () -> tryRequest(requestBuilder, handler, retVal, attemptNumber, newRedirectLocations) + ); + } + } else { + // Redirect to a server that is not one of the known locations. Treat service as unavailable. + if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) { + final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber); + + log.info( + "Service [%s] issued redirect to unknown URL [%s] on attempt #%d; retrying in %,d ms.", + serviceName, + newUri, + nextAttemptNumber, + backoffMs + ); + + connectExec.schedule( + () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()), + backoffMs, + TimeUnit.MILLISECONDS + ); + } else { + retVal.setException( + new ServiceNotAvailableException( + serviceName, + "issued redirect to unknown URL [%s]", + newUri + ) + ); + } + } + } }, connectExec ); @@ -285,7 +373,7 @@ public void onFailure(final Throwable t) ); } - private void whenServiceReady(final Consumer callback, final SettableFuture retVal) + private void whenServiceReady(final Consumer callback, final SettableFuture retVal) { Futures.addCallback( serviceLocator.locate(), @@ -300,8 +388,7 @@ public void onSuccess(final ServiceLocations locations) } try { - final ServiceLocation location = pick(locations); - callback.accept(location); + callback.accept(locations); } catch (Throwable t) { // It's a bug if this happens. The purpose of this line is to help us debug what went wrong. @@ -328,10 +415,7 @@ private ServiceLocation pick(final ServiceLocations locations) if (preferred != null) { // Preferred location is set. Use it if it's one of known locations. for (final ServiceLocation location : locations.getLocations()) { - final ServiceLocation locationNoPath = - new ServiceLocation(location.getHost(), location.getPlaintextPort(), location.getTlsPort(), ""); - - if (locationNoPath.equals(preferred)) { + if (serviceLocationNoPath(location).equals(preferred)) { return location; } } @@ -392,6 +476,9 @@ static long computeBackoffMs(final ServiceRetryPolicy retryPolicy, final long at ); } + /** + * Returns a {@link ServiceLocation} without a path component, based on a URI. + */ @Nullable @VisibleForTesting static ServiceLocation serviceLocationNoPathFromUri(@Nullable final String uriString) @@ -423,6 +510,14 @@ static ServiceLocation serviceLocationNoPathFromUri(@Nullable final String uriSt } } + /** + * Returns a {@link ServiceLocation} without its path. + */ + static ServiceLocation serviceLocationNoPath(final ServiceLocation location) + { + return new ServiceLocation(location.getHost(), location.getPlaintextPort(), location.getTlsPort(), ""); + } + @VisibleForTesting static boolean isRedirect(final HttpResponseStatus responseStatus) { diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceNotAvailableException.java b/server/src/main/java/org/apache/druid/rpc/ServiceNotAvailableException.java index 290fa2cf7ab7..843ac2dfa54f 100644 --- a/server/src/main/java/org/apache/druid/rpc/ServiceNotAvailableException.java +++ b/server/src/main/java/org/apache/druid/rpc/ServiceNotAvailableException.java @@ -19,13 +19,20 @@ package org.apache.druid.rpc; +import org.apache.druid.java.util.common.StringUtils; + /** * Returned by {@link ServiceClient#asyncRequest} when a request has failed because the service is not available. */ public class ServiceNotAvailableException extends RpcException { + public ServiceNotAvailableException(final String serviceName, final String reason, final Object... reasonArgs) + { + super("Service [%s] %s", serviceName, StringUtils.format(reason, reasonArgs)); + } + public ServiceNotAvailableException(final String serviceName) { - super("Service [%s] is not available", serviceName); + this(serviceName, "is not available"); } } diff --git a/server/src/test/java/org/apache/druid/rpc/ServiceClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/ServiceClientImplTest.java index 48f922e6ab6b..dc8bba87d139 100644 --- a/server/src/test/java/org/apache/druid/rpc/ServiceClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/ServiceClientImplTest.java @@ -67,6 +67,9 @@ public class ServiceClientImplTest private static final String SERVICE_NAME = "test-service"; private static final ServiceLocation SERVER1 = new ServiceLocation("example.com", -1, 8888, "/q"); private static final ServiceLocation SERVER2 = new ServiceLocation("example.com", -1, 9999, "/q"); + private static final ServiceLocation SERVER3 = new ServiceLocation("example.com", -1, 1111, "/q"); + private static final ServiceLocation SERVER4 = new ServiceLocation("example.com", -1, 2222, "/q"); + private static final ServiceLocation SERVER5 = new ServiceLocation("example.com", -1, 3333, "/q"); private ScheduledExecutorService exec; @@ -270,7 +273,63 @@ public void test_request_followRedirect() throws Exception } @Test - public void test_request_tooManyRedirects() + public void test_request_tooLongRedirectChain() + { + final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo"); + + // Redirect chain longer than max length. + stubLocatorCall(locations(SERVER1, SERVER2, SERVER3, SERVER4, SERVER5)); + expectHttpCall(requestBuilder, SERVER1) + .thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER2) + .thenReturn(redirectResponse(requestBuilder.build(SERVER3).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER3) + .thenReturn(redirectResponse(requestBuilder.build(SERVER4).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER4) + .thenReturn(redirectResponse(requestBuilder.build(SERVER5).getUrl().toString())); + + serviceClient = makeServiceClient(StandardRetryPolicy.noRetries()); + + final ExecutionException e = Assert.assertThrows( + ExecutionException.class, + () -> doRequest(serviceClient, requestBuilder) + ); + + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class)); + MatcherAssert.assertThat( + e.getCause(), + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects")) + ); + } + + @Test + public void test_request_tooLongRedirectChainRetry() throws Exception + { + final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo"); + final ImmutableMap expectedResponseObject = ImmutableMap.of("foo", "bar"); + + // Redirect chain longer than max length. Can be followed across retries. + stubLocatorCall(locations(SERVER1, SERVER2, SERVER3, SERVER4, SERVER5)); + expectHttpCall(requestBuilder, SERVER1) + .thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER2) + .thenReturn(redirectResponse(requestBuilder.build(SERVER3).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER3) + .thenReturn(redirectResponse(requestBuilder.build(SERVER4).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER4) + .thenReturn(redirectResponse(requestBuilder.build(SERVER5).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER5) + .thenReturn(valueResponse(expectedResponseObject)); + + serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build()); + + final Map response = doRequest(serviceClient, requestBuilder); + + Assert.assertEquals(expectedResponseObject, response); + } + + @Test + public void test_request_selfRedirectLoop() { final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo"); @@ -279,17 +338,43 @@ public void test_request_tooManyRedirects() expectHttpCall(requestBuilder, SERVER1) .thenReturn(redirectResponse(requestBuilder.build(SERVER1).getUrl().toString())); - serviceClient = makeServiceClient(StandardRetryPolicy.unlimited()); + serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(10).build()); final ExecutionException e = Assert.assertThrows( ExecutionException.class, () -> doRequest(serviceClient, requestBuilder) ); - MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class)); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class)); MatcherAssert.assertThat( e.getCause(), - ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("redirected too many times")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects")) + ); + } + + @Test + public void test_request_twoServerRedirectLoop() + { + final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo"); + + // Endless redirects between the same two servers. + stubLocatorCall(locations(SERVER1, SERVER2)); + expectHttpCall(requestBuilder, SERVER1) + .thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString())); + expectHttpCall(requestBuilder, SERVER2) + .thenReturn(redirectResponse(requestBuilder.build(SERVER1).getUrl().toString())); + + serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(10).build()); + + final ExecutionException e = Assert.assertThrows( + ExecutionException.class, + () -> doRequest(serviceClient, requestBuilder) + ); + + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class)); + MatcherAssert.assertThat( + e.getCause(), + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects")) ); } @@ -314,7 +399,7 @@ public void test_request_redirectInvalid() MatcherAssert.assertThat( e.getCause(), ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString("redirected [0] times to invalid URL [invalid-url]")) + CoreMatchers.containsString("redirected to invalid URL [invalid-url]")) ); } @@ -338,7 +423,7 @@ public void test_request_redirectNil() MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class)); MatcherAssert.assertThat( e.getCause(), - ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("redirected [0] times to invalid URL [null]")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("redirected to invalid URL [null]")) ); } @@ -359,10 +444,11 @@ public void test_request_dontFollowRedirectToUnknownServer() () -> doRequest(serviceClient, requestBuilder) ); - MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class)); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class)); MatcherAssert.assertThat( e.getCause(), - ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("redirected too many times")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( + "issued redirect to unknown URL [https://example.com:9999/q/foo]")) ); }