diff --git a/CHANGELOG.md b/CHANGELOG.md index 87708d6c4c..59df6b2138 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,76 @@ ## 6.12.0 [unreleased] +### Features +1. [#643](https://github.com/influxdata/influxdb-client-java/pull/643): `ConnectionClosingInterceptor` interceptor closes connections that exceed +a specified maximum lifetime age (TTL). It's beneficial for scenarios where your application requires establishing new connections to the same host after +a predetermined interval. + +The connection to the InfluxDB Enterprise with the `ConnectionClosingInterceptor` can be configured as follows: +```java +package example; + +import java.time.Duration; +import java.util.Collections; + +import okhttp3.OkHttpClient; +import okhttp3.Protocol; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.InfluxDBClientOptions; +import com.influxdb.client.domain.WriteConsistency; +import com.influxdb.rest.ConnectionClosingInterceptor; + +public class InfluxQLExample { + + public static void main(final String[] args) throws InterruptedException { + + // + // Credentials to connect to InfluxDB Enterprise + // + String url = "https://localhost:8086"; + String username = "admin"; + String password = "password"; + String database = "database"; + WriteConsistency consistency = WriteConsistency.ALL; + + // + // Configure underlying HTTP client + // + OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder() + .protocols(Collections.singletonList(Protocol.HTTP_1_1)); + + // + // Use new Connection TTL feature + // + Duration connectionMaxAge = Duration.ofMinutes(1); + ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(connectionMaxAge); + okHttpClientBuilder + .addNetworkInterceptor(interceptor) + .eventListenerFactory(call -> interceptor); + + // + // Configure InfluxDB client + // + InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder() + .url(url) + .org("-") + .authenticateToken(String.format("%s:%s", username, password).toCharArray()) + .bucket(String.format("%s/%s", database, "")) + .consistency(consistency) + .okHttpClient(okHttpClientBuilder); + + // + // Create client and write data + // + try (InfluxDBClient client = InfluxDBClientFactory.create(optionsBuilder.build())) { + + // ... + } + } +} +``` + ## 6.11.0 [2023-12-05] ### Features diff --git a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java new file mode 100644 index 0000000000..e23e8d7ff2 --- /dev/null +++ b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java @@ -0,0 +1,103 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.rest; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Logger; +import javax.annotation.Nonnull; + +import okhttp3.Call; +import okhttp3.Connection; +import okhttp3.EventListener; +import okhttp3.Interceptor; +import okhttp3.Response; +import okhttp3.internal.connection.RealConnection; + +/** + * This interceptor closes connections that exceed a specified maximum lifetime age (TTL). It's beneficial for + * scenarios where your application requires establishing new connections to the same host after a predetermined + * interval. This interceptor is most effective in applications that use a single connection, meaning requests + * are not made in parallel. + *

+ * Caution is advised, as setting a very short interval can lead to performance issues because + * establishing new connections is a resource-intensive operation. + */ +public class ConnectionClosingInterceptor extends EventListener implements Interceptor { + + private static final Logger LOG = Logger.getLogger(ConnectionClosingInterceptor.class.getName()); + + private final ConcurrentMap connectionTimes = new ConcurrentHashMap<>(); + private final long connectionMaxAgeMillis; + + /** + * Create a new interceptor that will close connections older than the given max age. + * + * @param connectionMaxAge the max age of connections, the precision is milliseconds + */ + public ConnectionClosingInterceptor(@Nonnull final Duration connectionMaxAge) { + this.connectionMaxAgeMillis = connectionMaxAge.toMillis(); + } + + @Override + @Nonnull + public Response intercept(@Nonnull final Chain chain) throws IOException { + Connection connection = chain.connection(); + + // + // If the connection is old, mark it to not be reused. + // + if (connection != null && isConnectionOld(connection)) { + if (connection instanceof RealConnection) { + LOG.fine("Marking connection to not be reused: " + connection); + ((RealConnection) connection).noNewExchanges$okhttp(); + connectionTimes.remove(connection); + } else { + LOG.warning("Unable to mark connection to not be reused: " + connection); + } + } + + return chain.proceed(chain.request()); + } + + @Override + public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) { + connectionTimes.putIfAbsent(connection, System.currentTimeMillis()); + } + + /** + * Check if the connection is older than the max age. + * + * @param connection the connection to check + * @return true if the connection is older than the max age + */ + private boolean isConnectionOld(@Nonnull final Connection connection) { + Long time = connectionTimes.get(connection); + if (time == null) { + return false; + } + long age = System.currentTimeMillis() - time; + return age > connectionMaxAgeMillis; + } +} diff --git a/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java b/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java new file mode 100644 index 0000000000..141c9e609a --- /dev/null +++ b/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java @@ -0,0 +1,143 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.rest; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; +import javax.annotation.Nonnull; + +import okhttp3.Call; +import okhttp3.Connection; +import okhttp3.EventListener; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.influxdb.test.AbstractMockServerTest; + +class ITConnectionClosingInterceptor extends AbstractMockServerTest { + + private static final Logger LOG = Logger.getLogger(ITConnectionClosingInterceptor.class.getName()); + + private String url; + private OkHttpClient client; + private ConnectionsListener connectionsListener; + + @BeforeEach + void setUp() { + connectionsListener = new ConnectionsListener(); + url = startMockServer(); + } + + @AfterEach + void tearDown() { + client.connectionPool().evictAll(); + client.dispatcher().executorService().shutdown(); + } + + @Test + public void withoutTTLonConnection() throws Exception { + + client = new OkHttpClient.Builder() + .eventListener(connectionsListener) + .build(); + + callApi(5, 3); + + Assertions.assertThat(connectionsListener.connections).hasSize(1); + Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1); + } + + @Test + public void withTTLonConnection() throws Exception { + + // Use connection TTL of 2 second + ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(Duration.ofSeconds(2)) { + + @Override + public void connectionAcquired(@NotNull Call call, @NotNull Connection connection) { + super.connectionAcquired(call, connection); + + // count the number of connections, the okhttp client can have only one listener => we have to use this + connectionsListener.connections.add(connection); + } + }; + + client = new OkHttpClient.Builder() + .addNetworkInterceptor(interceptor) + .eventListener(interceptor) + .protocols(Collections.singletonList(Protocol.HTTP_1_1)) + .build(); + + callApi(5, 3); + + Assertions.assertThat(connectionsListener.connections).hasSize(3); + Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1); + } + + /** + * Call API by specified times. + * + * @param times the number of times to call API + * @param sleepSeconds the number of seconds to sleep between calls + * @throws IOException if an error occurs + */ + private void callApi(final int times, final int sleepSeconds) throws Exception { + for (int i = 0; i < times; i++) { + mockServer.enqueue(createResponse("")); + + Request request = new Request.Builder() + .url(url) + .build(); + + LOG.info(String.format("Calling API %d", i)); + try (Response response = client.newCall(request).execute()) { + Assertions.assertThat(response.isSuccessful()).isTrue(); + } + + LOG.info(String.format("Sleeping %d seconds; connection counts: %d", sleepSeconds, connectionsListener.connections.size())); + Thread.sleep(sleepSeconds * 1000L); + } + } + + /** + * Event listener that store acquired connections. + */ + private static class ConnectionsListener extends EventListener { + private final Set connections = new HashSet<>(); + + @Override + public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) { + connections.add(connection); + } + } +}