diff --git a/CHANGELOG.md b/CHANGELOG.md index 0931e41c..358a7bcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,18 @@ `"second"` ( in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`). +### Bug Fixes + +1. [#239](https://github.com/InfluxCommunity/influxdb3-java/pull/239): Use write options from `ClientConfig` in + `InfluxDBClientImpl` write methods: + + ```java + public void writeRecord(@Nullable final String record); + public void writeRecords(@Nonnull final List records); + public void writePoint(@Nullable final Point point); + public void writePoints(@Nonnull final List points); + ``` + ## 1.1.0 [2025-05-22] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 61d66cc3..0db64a89 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -81,6 +81,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient { private final RestClient restClient; private final FlightSqlClient flightSqlClient; + private final WriteOptions emptyWriteOptions; /** * Creates an instance using the specified config. @@ -110,11 +111,12 @@ public InfluxDBClientImpl(@Nonnull final ClientConfig config) { this.config = config; this.restClient = restClient != null ? restClient : new RestClient(config); this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config); + this.emptyWriteOptions = new WriteOptions(null); } @Override public void writeRecord(@Nullable final String record) { - writeRecord(record, WriteOptions.DEFAULTS); + writeRecord(record, emptyWriteOptions); } @Override @@ -128,7 +130,7 @@ public void writeRecord(@Nullable final String record, @Nonnull final WriteOptio @Override public void writeRecords(@Nonnull final List records) { - writeRecords(records, WriteOptions.DEFAULTS); + writeRecords(records, emptyWriteOptions); } @Override @@ -138,7 +140,7 @@ public void writeRecords(@Nonnull final List records, @Nonnull final Wri @Override public void writePoint(@Nullable final Point point) { - writePoint(point, WriteOptions.DEFAULTS); + writePoint(point, emptyWriteOptions); } @Override @@ -152,7 +154,7 @@ public void writePoint(@Nullable final Point point, @Nonnull final WriteOptions @Override public void writePoints(@Nonnull final List points) { - writePoints(points, WriteOptions.DEFAULTS); + writePoints(points, emptyWriteOptions); } @Override diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 7d8488e7..49e513da 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -23,15 +23,19 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import io.netty.handler.codec.http.HttpResponseStatus; +import okhttp3.HttpUrl; import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -223,10 +227,8 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, - () -> { - client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()); - } + () -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) ); assertThat(mockServer.getRequestCount()).isEqualTo(1); @@ -242,6 +244,167 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { + " (supported by InfluxDB 3 Core/Enterprise servers only)."); } + @Test + void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecords(List.of("mem,tag=one value=1.0")); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecords(List.of("mem,tag=one value=1.0")); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoint(point); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoint(point); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoints(List.of(point)); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoints(List.of(point)); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + private void checkWriteCalled(final String expectedPath, final String expectedDB, + final String expectedPrecision, final boolean expectedNoSync, + final boolean expectedGzip) throws InterruptedException { + RecordedRequest request = assertThatServerRequested(); + HttpUrl requestUrl = request.getRequestUrl(); + assertThat(requestUrl).isNotNull(); + assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); + if (expectedNoSync) { + assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); + } else { + assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); + } + assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision); + if (expectedNoSync) { + assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true"); + } else { + assertThat(requestUrl.queryParameter("no_sync")).isNull(); + } + if (expectedGzip) { + assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); + } else { + assertThat(request.getHeader("Content-Encoding")).isNull(); + } + } + + @NotNull + private RecordedRequest assertThatServerRequested() throws InterruptedException { + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + return request; + } + @Test void allParameterSpecified() throws InterruptedException { mockServer.enqueue(createResponse(200)); @@ -344,7 +507,7 @@ void defaultTags() throws InterruptedException { } @Test - public void retryHandled429Test() throws InterruptedException { + public void retryHandled429Test() { mockServer.enqueue(createResponse(429) .setBody("{ \"message\" : \"Too Many Requests\" }") .setHeader("retry-after", "42") diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index eb00e382..e4069233 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -278,6 +278,9 @@ public void testQueryRows() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test public void testQueryRowWithOptions() throws Exception { try (InfluxDBClient client = InfluxDBClient.getInstance(