Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
import org.apache.paimon.shade.guava30.com.google.common.net.HttpHeaders;

import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import javax.net.ssl.SSLException;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* Defines exponential HTTP request retry interceptor.
*
* <p>The following retrievable IOException
*
* <ul>
* <li>InterruptedIOException
* <li>UnknownHostException
* <li>ConnectException
* <li>NoRouteToHostException
* <li>SSLException
* </ul>
*
* <p>The following retrievable HTTP status codes are defined:
*
* <ul>
* <li>TOO_MANY_REQUESTS (429)
* <li>BAD_GATEWAY (502)
* <li>SERVICE_UNAVAILABLE (503)
* <li>GATEWAY_TIMEOUT (504)
* </ul>
*
* <p>The following retrievable HTTP method which is idempotent are defined:
*
* <ul>
* <li>GET
* <li>HEAD
* <li>PUT
* <li>DELETE
* <li>TRACE
* <li>OPTIONS
* </ul>
*/
public class ExponentialHttpRetryInterceptor implements Interceptor {

private final int maxRetries;
private final Set<Class<? extends IOException>> nonRetriableExceptions;
private final Set<Integer> retrievableCodes;
private final Set<String> retrievableMethods;

public ExponentialHttpRetryInterceptor(int maxRetries) {
this.maxRetries = maxRetries;
this.retrievableMethods =
ImmutableSet.of("GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS");
this.retrievableCodes = ImmutableSet.of(429, 502, 503, 504);
this.nonRetriableExceptions =
ImmutableSet.of(
InterruptedIOException.class,
UnknownHostException.class,
ConnectException.class,
NoRouteToHostException.class,
SSLException.class);
}

@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response = null;

for (int retryCount = 1; ; retryCount++) {
try {
response = chain.proceed(request);
} catch (IOException e) {
if (needRetry(request.method(), e, retryCount)) {
wait(response, retryCount);
continue;
}
}
if (needRetry(response, retryCount)) {
if (response != null) {
response.close();
}
wait(response, retryCount);
} else {
return response;
}
}
}

public boolean needRetry(Response response, int execCount) {
if (execCount > maxRetries) {
return false;
}
return response == null
|| (!response.isSuccessful() && retrievableCodes.contains(response.code()));
}

public boolean needRetry(String method, IOException e, int execCount) {
if (execCount > maxRetries) {
return false;
}
if (!retrievableMethods.contains(method)) {
return false;
}
if (nonRetriableExceptions.contains(e.getClass())) {
return false;
} else {
for (Class<? extends IOException> rejectException : nonRetriableExceptions) {
if (rejectException.isInstance(e)) {
return false;
}
}
}
return true;
}

public long getRetryIntervalInMilliseconds(Response response, int execCount) {
// a server may send a 429 / 503 with a Retry-After header
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
String retryAfterStrInSecond =
response == null ? null : response.header(HttpHeaders.RETRY_AFTER);
Long retryAfter = null;
if (retryAfterStrInSecond != null) {
try {
retryAfter = Long.parseLong(retryAfterStrInSecond) * 1000;
} catch (Throwable ignore) {
}

if (retryAfter != null && retryAfter > 0) {
return retryAfter;
}
}

int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1.0), 64.0);
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1)));

return delayMillis + jitter;
}

private void wait(Response response, int retryCount) throws InterruptedIOException {
try {
Thread.sleep(getRetryIntervalInMilliseconds(response, retryCount));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
}
29 changes: 24 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
Expand All @@ -52,6 +54,7 @@ public class HttpClient implements RESTClient {

private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");
private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;

private final OkHttpClient okHttpClient;
private final String uri;
Expand Down Expand Up @@ -191,14 +194,30 @@ private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);

ConnectionPool connectionPool =
new ConnectionPool(
httpClientOptions.maxConnections(),
CONNECTION_KEEP_ALIVE_DURATION_MS,
TimeUnit.MILLISECONDS);
Dispatcher dispatcher = new Dispatcher(executorService);
// set max requests per host use max connections
dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(new Dispatcher(executorService))
.dispatcher(dispatcher)
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT));
httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
httpClientOptions.readTimeout().ifPresent(builder::readTimeout);
.connectionPool(connectionPool)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
.addInterceptor(
new ExponentialHttpRetryInterceptor(
httpClientOptions.maxRetries()));
httpClientOptions
.connectTimeout()
.ifPresent(
timeoutDuration -> {
builder.connectTimeout(timeoutDuration);
builder.readTimeout(timeoutDuration);
});

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,30 @@ public class HttpClientOptions {

private final String uri;
@Nullable private final Duration connectTimeout;
@Nullable private final Duration readTimeout;
private final int threadPoolSize;
private final int maxConnections;
private final int maxRetries;

public HttpClientOptions(
String uri,
@Nullable Duration connectTimeout,
@Nullable Duration readTimeout,
int threadPoolSize) {
int threadPoolSize,
int maxConnections,
int maxRetries) {
this.uri = uri;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.threadPoolSize = threadPoolSize;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
}

public static HttpClientOptions create(Options options) {
return new HttpClientOptions(
options.get(RESTCatalogOptions.URI),
options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
options.get(RESTCatalogOptions.READ_TIMEOUT),
options.get(RESTCatalogOptions.THREAD_POOL_SIZE));
options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
options.get(RESTCatalogOptions.MAX_CONNECTIONS),
options.get(RESTCatalogOptions.MAX_RETIES));
}

public String uri() {
Expand All @@ -60,11 +64,15 @@ public Optional<Duration> connectTimeout() {
return Optional.ofNullable(connectTimeout);
}

public Optional<Duration> readTimeout() {
return Optional.ofNullable(readTimeout);
}

public int threadPoolSize() {
return threadPoolSize;
}

public int maxConnections() {
return maxConnections;
}

public int maxRetries() {
return Math.max(maxRetries, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ public class RESTCatalogOptions {
public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
ConfigOptions.key("rest.client.connection-timeout")
.durationType()
.noDefaultValue()
.defaultValue(Duration.ofSeconds(180))
.withDescription("REST Catalog http client connect timeout.");

public static final ConfigOption<Duration> READ_TIMEOUT =
ConfigOptions.key("rest.client.read-timeout")
.durationType()
.noDefaultValue()
.withDescription("REST Catalog http client read timeout.");
public static final ConfigOption<Integer> MAX_CONNECTIONS =
ConfigOptions.key("rest.client.max-connections")
.intType()
.defaultValue(100)
.withDescription("REST Catalog http client's max connections.");

public static final ConfigOption<Integer> MAX_RETIES =
ConfigOptions.key("rest.client.max-retries")
.intType()
.defaultValue(5)
.withDescription("REST Catalog http client's max retry times.");

public static final ConfigOption<Integer> THREAD_POOL_SIZE =
ConfigOptions.key("rest.client.num-threads")
Expand Down
Loading
Loading