From 908facbc16e72f67c81fb44c4b957e9356042cb0 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Fri, 23 May 2025 00:24:53 +0200 Subject: [PATCH 01/13] feat: fast no-sync write support --- .../influxdb/v3/client/InfluxDBClient.java | 3 + .../v3/client/config/ClientConfig.java | 48 +++++++++- .../client/internal/InfluxDBClientImpl.java | 43 ++++++--- .../v3/client/write/WriteOptions.java | 87 ++++++++++++++++-- .../client/write/WritePrecisionConverter.java | 34 +++++++ .../v3/client/AbstractMockServerTest.java | 5 ++ .../v3/client/InfluxDBClientWriteTest.java | 54 +++++++++++- .../v3/client/config/ClientConfigTest.java | 88 ++++++++++++++++++- .../v3/client/write/WriteOptionsTest.java | 21 ++++- .../write/WritePrecisionConverterTest.java | 43 +++++++++ 10 files changed, 401 insertions(+), 25 deletions(-) create mode 100644 src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java create mode 100644 src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index 99604916..723a8223 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -445,6 +445,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { *
  • database - database (bucket) name
  • *
  • precision - timestamp precision when writing data
  • *
  • gzipThreshold - payload size size for gzipping data
  • + *
  • writeNoSync - skip waiting for WAL persistence on write
  • * * * @param connectionString connection string @@ -477,6 +478,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
  • INFLUX_DATABASE - database (bucket) name
  • *
  • INFLUX_PRECISION - timestamp precision when writing data
  • *
  • INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
  • + *
  • INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
  • * * Supported system properties: * * * @return instance of {@link InfluxDBClient} diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index ad6a9d4f..fb1983ae 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -36,6 +36,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; /** @@ -52,6 +53,7 @@ *
  • writePrecision - precision to use when writing points to InfluxDB
  • *
  • defaultTags - defaultTags added when writing points to InfluxDB
  • *
  • gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
  • + *
  • writeNoSync - skip waiting for WAL persistence on write
  • *
  • responseTimeout - timeout when connecting to InfluxDB
  • *
  • allowHttpRedirects - allow redirects for InfluxDB connections
  • *
  • disableServerCertificateValidation - @@ -73,6 +75,7 @@ * .database("my-database") * .writePrecision(WritePrecision.S) * .gzipThreshold(4096) + * .writeNoSync(true) * .proxyUrl("http://localhost:10000") * .build(); * @@ -94,6 +97,7 @@ public final class ClientConfig { private final String database; private final WritePrecision writePrecision; private final Integer gzipThreshold; + private final Boolean writeNoSync; private final Map defaultTags; private final Duration timeout; private final Boolean allowHttpRedirects; @@ -180,6 +184,16 @@ public Integer getGzipThreshold() { return gzipThreshold; } + /** + * Skip waiting for WAL persistence on write? + * + * @return skip waiting for WAL persistence on write + */ + @Nonnull + public Boolean getWriteNoSync() { + return writeNoSync; + } + /** * Gets default tags used when writing points. * @return default tags @@ -295,6 +309,7 @@ public boolean equals(final Object o) { && Objects.equals(database, that.database) && writePrecision == that.writePrecision && Objects.equals(gzipThreshold, that.gzipThreshold) + && Objects.equals(writeNoSync, that.writeNoSync) && Objects.equals(defaultTags, that.defaultTags) && Objects.equals(timeout, that.timeout) && Objects.equals(allowHttpRedirects, that.allowHttpRedirects) @@ -309,7 +324,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, - database, writePrecision, gzipThreshold, + database, writePrecision, gzipThreshold, writeNoSync, timeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, defaultTags, sslRootsFilePath); @@ -323,6 +338,7 @@ public String toString() { .add("database='" + database + "'") .add("writePrecision=" + writePrecision) .add("gzipThreshold=" + gzipThreshold) + .add("writeNoSync=" + writeNoSync) .add("timeout=" + timeout) .add("allowHttpRedirects=" + allowHttpRedirects) .add("disableServerCertificateValidation=" + disableServerCertificateValidation) @@ -348,6 +364,7 @@ public static final class Builder { private String database; private WritePrecision writePrecision; private Integer gzipThreshold; + private Boolean writeNoSync; private Map defaultTags; private Duration timeout; private Boolean allowHttpRedirects; @@ -452,6 +469,19 @@ public Builder gzipThreshold(@Nullable final Integer gzipThreshold) { return this; } + /** + * Sets whether to skip waiting for WAL persistence on write. + * + * @param writeNoSync skip waiting for WAL persistence on write + * @return this + */ + @Nonnull + public Builder writeNoSync(@Nullable final Boolean writeNoSync) { + + this.writeNoSync = writeNoSync; + return this; + } + /** * Sets default tags to be written with points. * @@ -634,6 +664,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform if (parameters.containsKey("gzipThreshold")) { this.gzipThreshold(Integer.parseInt(parameters.get("gzipThreshold"))); } + if (parameters.containsKey("writeNoSync")) { + this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync"))); + } return new ClientConfig(this); } @@ -682,6 +715,10 @@ public ClientConfig build(@Nonnull final Map env, final Properti if (gzipThreshold != null) { this.gzipThreshold(Integer.parseInt(gzipThreshold)); } + final String writeNoSync = get.apply("INFLUX_WRITE_NO_SYNC", "influx.writeNoSync"); + if (writeNoSync != null) { + this.writeNoSync(Boolean.parseBoolean(writeNoSync)); + } return new ClientConfig(this); } @@ -690,15 +727,19 @@ private WritePrecision parsePrecision(@Nonnull final String precision) { WritePrecision writePrecision; switch (precision) { case "ns": + case "nanosecond": writePrecision = WritePrecision.NS; break; case "us": + case "microsecond": writePrecision = WritePrecision.US; break; case "ms": + case "millisecond": writePrecision = WritePrecision.MS; break; case "s": + case "second": writePrecision = WritePrecision.S; break; default: @@ -716,8 +757,9 @@ private ClientConfig(@Nonnull final Builder builder) { authScheme = builder.authScheme; organization = builder.organization; database = builder.database; - writePrecision = builder.writePrecision != null ? builder.writePrecision : WritePrecision.NS; - gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : 1000; + writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION; + gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD; + writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC; defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(10); allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false; diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index aadf3ccf..63145e7a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -37,15 +37,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.influxdb.v3.client.*; +import com.influxdb.v3.client.write.WritePrecisionConverter; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.arrow.flight.CallOption; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; -import com.influxdb.v3.client.InfluxDBApiException; -import com.influxdb.v3.client.InfluxDBClient; -import com.influxdb.v3.client.Point; -import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.write.WriteOptions; @@ -274,11 +273,27 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti WritePrecision precision = options.precisionSafe(config); - Map queryParams = new HashMap<>() {{ - put("bucket", database); - put("org", config.getOrganization()); - put("precision", precision.name().toLowerCase()); - }}; + String path; + Map queryParams; + boolean noSync = options.noSyncSafe(config); + if (noSync) { + // Setting no_sync=true is supported only in the v3 API. + path = "api/v3/write_lp"; + queryParams = new HashMap<>() {{ + put("org", config.getOrganization()); + put("db", database); + put("precision", WritePrecisionConverter.toV3ApiString(precision)); + put("no_sync", "true"); + }}; + } else { + // By default, use the v2 API. + path = "api/v2/write"; + queryParams = new HashMap<>() {{ + put("org", config.getOrganization()); + put("bucket", database); + put("precision", WritePrecisionConverter.toV2ApiString(precision)); + }}; + } Map defaultTags = options.defaultTagsSafe(config); @@ -314,7 +329,15 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } headers.putAll(options.headersSafe()); - restClient.request("api/v2/write", HttpMethod.POST, body, queryParams, headers); + try { + restClient.request(path, HttpMethod.POST, body, queryParams, headers); + } catch (InfluxDBApiHttpException e) { + if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { + // Server does not support the v3 write API, can't use the NoSync option. + throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true (supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + } + throw e; + } } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index 5012db6c..e8192e46 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -61,14 +61,19 @@ public final class WriteOptions { * Default GZIP threshold. */ public static final Integer DEFAULT_GZIP_THRESHOLD = 1000; + /** + * Default NoSync. + */ + public static final boolean DEFAULT_NO_SYNC = false; /** * Default WriteOptions. */ - public static final WriteOptions DEFAULTS = new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD); + public static final WriteOptions DEFAULTS = new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null); private final String database; private final WritePrecision precision; private final Integer gzipThreshold; + private final Boolean noSync; private final Map defaultTags; private final Map headers; @@ -85,7 +90,7 @@ public final class WriteOptions { public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold) { - this(database, precision, gzipThreshold, null); + this(database, precision, gzipThreshold, null, null, null); } /** @@ -103,7 +108,26 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Map defaultTags) { - this(database, precision, gzipThreshold, defaultTags, null); + this(database, precision, gzipThreshold, null, defaultTags, null); + } + + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync) { + this(database, precision, gzipThreshold, noSync, null, null); } /** @@ -113,7 +137,7 @@ public WriteOptions(@Nullable final String database, * The headers specified here are preferred over the headers specified in the client configuration. */ public WriteOptions(@Nullable final Map headers) { - this(null, null, null, null, headers); + this(null, null, null, null, null, headers); } /** @@ -135,9 +159,35 @@ public WriteOptions(@Nullable final String database, @Nullable final Integer gzipThreshold, @Nullable final Map defaultTags, @Nullable final Map headers) { + this(database, precision, gzipThreshold, null, defaultTags, headers); + } + + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Map defaultTags, + @Nullable final Map headers) { this.database = database; this.precision = precision; this.gzipThreshold = gzipThreshold; + this.noSync = noSync; this.defaultTags = defaultTags == null ? Map.of() : defaultTags; this.headers = headers == null ? Map.of() : headers; } @@ -189,6 +239,16 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) { : (config.getGzipThreshold() != null ? config.getGzipThreshold() : DEFAULT_GZIP_THRESHOLD); } + /** + * @param config with default value + * @return Skip waiting for WAL persistence on write. + */ + public boolean noSyncSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return noSync != null ? noSync + : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC); + } + /** * @return The headers to be added to write request. */ @@ -209,13 +269,14 @@ public boolean equals(final Object o) { return Objects.equals(database, that.database) && precision == that.precision && Objects.equals(gzipThreshold, that.gzipThreshold) + && Objects.equals(noSync, that.noSync) && defaultTags.equals(that.defaultTags) && headers.equals(that.headers); } @Override public int hashCode() { - return Objects.hash(database, precision, gzipThreshold, defaultTags, headers); + return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, headers); } private boolean isNotDefined(final String option) { @@ -231,6 +292,7 @@ public static final class Builder { private String database; private WritePrecision precision; private Integer gzipThreshold; + private Boolean noSync; private Map defaultTags = new HashMap<>(); private Map headers = new HashMap<>(); @@ -273,6 +335,19 @@ public Builder gzipThreshold(@Nonnull final Integer gzipThreshold) { return this; } + /** + * Sets whether to skip waiting for WAL persistence on write. + * + * @param noSync skip waiting for WAL persistence on write. + * @return this + */ + @Nonnull + public Builder noSync(@Nonnull final Boolean noSync) { + + this.noSync = noSync; + return this; + } + /** * Sets defaultTags. * @@ -310,6 +385,6 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this(builder.database, builder.precision, builder.gzipThreshold, builder.defaultTags, builder.headers); + this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, builder.headers); } } diff --git a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java new file mode 100644 index 00000000..19238fad --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java @@ -0,0 +1,34 @@ +package com.influxdb.v3.client.write; + +public class WritePrecisionConverter { + + public static String toV2ApiString(WritePrecision precision) { + switch (precision) { + case NS: + return "ns"; + case US: + return "us"; + case MS: + return "ms"; + case S: + return "s"; + default: + throw new IllegalArgumentException("Unsupported precision '" + precision + "'"); + } + } + + public static String toV3ApiString(WritePrecision precision) { + switch (precision) { + case NS: + return "nanosecond"; + case US: + return "microsecond"; + case MS: + return "millisecond"; + case S: + return "second"; + default: + throw new IllegalArgumentException("Unsupported precision '" + precision + "'"); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java index 4ade5530..66f98a2d 100644 --- a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java +++ b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java @@ -53,6 +53,11 @@ protected void shutdownMockServer() throws IOException { mockServer.shutdown(); } + @Nonnull + protected MockResponse createEmptyResponse(final int responseCode) { + return new MockResponse().setResponseCode(responseCode); + } + @Nonnull protected MockResponse createResponse(final int responseCode) { diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 840131b0..179ad5b3 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Map; +import io.netty.handler.codec.http.HttpResponseStatus; import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -183,12 +184,63 @@ void gzipParameterSpecified() throws InterruptedException { Assertions.assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); } + @Test + void writeNoSyncFalse_UsesV2API() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build()); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v2/write"); + Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); + + } + + @Test + void writeNoSyncTrue_UsesV3API() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build()); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); + Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("nanosecond"); + } + + @Test + void writeNoSyncTrueOnV2Server_ThrowsException() throws InterruptedException { + mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); + + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, () -> + client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) + ); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); + Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("millisecond"); + + Assertions.assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); + Assertions.assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true (supported by InfluxDB 3 Core/Enterprise servers only)."); + } + @Test void allParameterSpecified() throws InterruptedException { mockServer.enqueue(createResponse(200)); client.writeRecord("mem,tag=one value=1.0", - new WriteOptions("your-database", WritePrecision.S, 1)); + new WriteOptions("your-database", WritePrecision.S, 1, false)); Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index d2d772f4..48b7d617 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; +import com.influxdb.v3.client.write.WriteOptions; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -74,19 +75,21 @@ void toStringConfig() { Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true); Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true); + Assertions.assertThat(configString).contains("writeNoSync=false"); } @Test void fromConnectionString() throws MalformedURLException { ClientConfig cfg = new ClientConfig.Builder() .build("http://localhost:9999/" - + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128"); + + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true"); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray()); Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org"); Assertions.assertThat(cfg.getDatabase()).isEqualTo("my-db"); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128); + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -97,6 +100,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getDatabase()).isEqualTo(null); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -107,6 +111,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getDatabase()).isEqualTo(null); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -126,6 +131,19 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getAuthScheme()).isEqualTo("my-auth"); } + @Test + void fromConnectionString_longPrecision() throws MalformedURLException { + ClientConfig cfg; + cfg = new ClientConfig.Builder().build("http://localhost:9999/?token=x&precision=nanosecond"); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); + cfg = new ClientConfig.Builder().build("http://localhost:9999/?token=x&precision=microsecond"); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); + cfg = new ClientConfig.Builder().build("http://localhost:9999/?token=x&precision=millisecond"); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); + cfg = new ClientConfig.Builder().build("http://localhost:9999/?token=x&precision=second"); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.S); + } + @Test void fromEnv() { // minimal @@ -179,7 +197,8 @@ void fromEnv() { "INFLUX_ORG", "my-org", "INFLUX_DATABASE", "my-db", "INFLUX_PRECISION", "ms", - "INFLUX_GZIP_THRESHOLD", "64" + "INFLUX_GZIP_THRESHOLD", "64", + "INFLUX_WRITE_NO_SYNC", "true" ); cfg = new ClientConfig.Builder() .build(env, null); @@ -189,6 +208,37 @@ void fromEnv() { Assertions.assertThat(cfg.getDatabase()).isEqualTo("my-db"); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + } + + @Test + void fromEnv_longPrecision() { + Map baseEnv = Map.of( + "INFLUX_HOST", "http://localhost:9999/", + "INFLUX_TOKEN", "my-token" + ); + Map env; + ClientConfig cfg; + + env = new HashMap<>(baseEnv); + env.put("INFLUX_PRECISION", "nanosecond"); + cfg = new ClientConfig.Builder().build(env, null); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); + + env = new HashMap<>(baseEnv); + env.put("INFLUX_PRECISION", "microsecond"); + cfg = new ClientConfig.Builder().build(env, null); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); + + env = new HashMap<>(baseEnv); + env.put("INFLUX_PRECISION", "millisecond"); + cfg = new ClientConfig.Builder().build(env, null); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); + + env = new HashMap<>(baseEnv); + env.put("INFLUX_PRECISION", "second"); + cfg = new ClientConfig.Builder().build(env, null); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.S); } @Test @@ -206,6 +256,7 @@ void fromSystemProperties() { // these are defaults Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); // basic properties = new Properties(); @@ -242,6 +293,7 @@ void fromSystemProperties() { properties.put("influx.database", "my-db"); properties.put("influx.precision", "ms"); properties.put("influx.gzipThreshold", "64"); + properties.put("influx.writeNoSync", "true"); cfg = new ClientConfig.Builder() .build(new HashMap<>(), properties); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); @@ -250,5 +302,37 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getDatabase()).isEqualTo("my-db"); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); + Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); } + + @Test + void fromSystemProperties_longPrecision() throws MalformedURLException { + Properties baseProps = new Properties(); + baseProps.put("influx.host", "http://localhost:9999/"); + baseProps.put("influx.token", "my-token"); + + Properties props; + ClientConfig cfg; + + props = new Properties(baseProps); + props.put("influx.precision", "nanosecond"); + cfg = new ClientConfig.Builder().build(new HashMap<>(), props); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); + + props = new Properties(baseProps); + props.put("influx.precision", "microsecond"); + cfg = new ClientConfig.Builder().build(new HashMap<>(), props); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); + + props = new Properties(baseProps); + props.put("influx.precision", "millisecond"); + cfg = new ClientConfig.Builder().build(new HashMap<>(), props); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); + + props = new Properties(baseProps); + props.put("influx.precision", "second"); + cfg = new ClientConfig.Builder().build(new HashMap<>(), props); + Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.S); + } + } diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java index ed621cb8..b63d98b4 100644 --- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java @@ -55,9 +55,9 @@ void optionsBasics() { @Test void optionsEqualAll() { - WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512); + WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true); WriteOptions optionsViaBuilder = new WriteOptions.Builder() - .database("my-database").precision(WritePrecision.S).gzipThreshold(512).build(); + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build(); Assertions.assertThat(options).isEqualTo(optionsViaBuilder); } @@ -115,13 +115,15 @@ void optionsOverrideAll() { .organization("my-org") .writePrecision(WritePrecision.S) .gzipThreshold(512) + .writeNoSync(false) .build(); - WriteOptions options = new WriteOptions("your-database", WritePrecision.US, 4096); + WriteOptions options = new WriteOptions("your-database", WritePrecision.US, 4096, true); Assertions.assertThat(options.databaseSafe(config)).isEqualTo("your-database"); Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WritePrecision.US); Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(4096); + Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(true); } @Test @@ -172,6 +174,19 @@ void optionsOverrideGzipThreshold() { Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(4096); } + @Test + void optionsOverrideWriteNoSync() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeNoSync(true) + .build(); + + WriteOptions options = new WriteOptions.Builder().noSync(false).build(); + + Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false); + } + @Test void optionsOverridesDefaultTags() { Map defaultTagsBase = new HashMap<>() {{ diff --git a/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java new file mode 100644 index 00000000..136b42fb --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java @@ -0,0 +1,43 @@ +package com.influxdb.v3.client.write; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class WritePrecisionConverterTest { + @Test + void toV2ApiString() { + Map testCases = Map.of( + WritePrecision.NS, "ns", + WritePrecision.US, "us", + WritePrecision.MS, "ms", + WritePrecision.S, "s" + ); + + for (Map.Entry e : testCases.entrySet()) { + WritePrecision precision = e.getKey(); + String expectedString = e.getValue(); + String result = WritePrecisionConverter.toV2ApiString(precision); + assertEquals(expectedString, result, "Failed for precision: " + precision); + } + } + + @Test + void toV3ApiString() { + Map tc = Map.of( + WritePrecision.NS, "nanosecond", + WritePrecision.US, "microsecond", + WritePrecision.MS, "millisecond", + WritePrecision.S, "second" + ); + + for (Map.Entry e : tc.entrySet()) { + WritePrecision precision = e.getKey(); + String expectedString = e.getValue(); + String result = WritePrecisionConverter.toV3ApiString(precision); + assertEquals(expectedString, result, "Failed for precision: " + precision); + } + } +} From 07ce7daefb850badc04251dbd6b18f311343a08b Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Mon, 26 May 2025 10:15:16 +0200 Subject: [PATCH 02/13] lint: fix import order --- .../influxdb/v3/client/internal/InfluxDBClientImpl.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 63145e7a..2f44a0bb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -37,18 +37,22 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import com.influxdb.v3.client.*; -import com.influxdb.v3.client.write.WritePrecisionConverter; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.arrow.flight.CallOption; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import com.influxdb.v3.client.InfluxDBApiException; +import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; +import com.influxdb.v3.client.write.WritePrecisionConverter; /** * Implementation of the InfluxDBClient. It is thread-safe and can be safely shared between threads. From b613133d75393f2bc6ce6448823aad5fe9b0473a Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Mon, 26 May 2025 10:29:47 +0200 Subject: [PATCH 03/13] lint: fix lint issues --- .../influxdb/v3/client/internal/InfluxDBClientImpl.java | 3 ++- .../java/com/influxdb/v3/client/write/WriteOptions.java | 6 ++++-- .../influxdb/v3/client/write/WritePrecisionConverter.java | 7 +++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 2f44a0bb..3fd97f24 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -338,7 +338,8 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } catch (InfluxDBApiHttpException e) { if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { // Server does not support the v3 write API, can't use the NoSync option. - throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true (supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " + + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); } throw e; } diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index e8192e46..81f842a9 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -68,7 +68,8 @@ public final class WriteOptions { /** * Default WriteOptions. */ - public static final WriteOptions DEFAULTS = new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null); + public static final WriteOptions DEFAULTS = new WriteOptions( + null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null); private final String database; private final WritePrecision precision; @@ -385,6 +386,7 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, builder.headers); + this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, + builder.headers); } } diff --git a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java index 19238fad..a6f0d11d 100644 --- a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java +++ b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java @@ -2,7 +2,10 @@ public class WritePrecisionConverter { - public static String toV2ApiString(WritePrecision precision) { + private WritePrecisionConverter() { + } + + public static String toV2ApiString(final WritePrecision precision) { switch (precision) { case NS: return "ns"; @@ -17,7 +20,7 @@ public static String toV2ApiString(WritePrecision precision) { } } - public static String toV3ApiString(WritePrecision precision) { + public static String toV3ApiString(final WritePrecision precision) { switch (precision) { case NS: return "nanosecond"; From 1b0c46b40d239a14bba0a766b215133c7c4ef515 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Mon, 26 May 2025 10:38:19 +0200 Subject: [PATCH 04/13] lint: fix lint issues --- .../client/internal/InfluxDBClientImpl.java | 4 +- .../client/write/WritePrecisionConverter.java | 2 +- .../v3/client/InfluxDBClientWriteTest.java | 183 +++++++++--------- .../v3/client/config/ClientConfigTest.java | 8 +- .../write/WritePrecisionConverterTest.java | 4 +- 5 files changed, 104 insertions(+), 97 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 3fd97f24..cc0613a2 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -338,8 +338,8 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } catch (InfluxDBApiHttpException e) { if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { // Server does not support the v3 write API, can't use the NoSync option. - throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " + - "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " + + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); } throw e; } diff --git a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java index a6f0d11d..89935a03 100644 --- a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java +++ b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java @@ -1,6 +1,6 @@ package com.influxdb.v3.client.write; -public class WritePrecisionConverter { +public final class WritePrecisionConverter { private WritePrecisionConverter() { } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 179ad5b3..7d8488e7 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -35,6 +35,7 @@ import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; class InfluxDBClientWriteTest extends AbstractMockServerTest { @@ -68,7 +69,7 @@ void writeEmptyBatch() { client.writeRecords(Collections.singletonList(null)); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @Test @@ -77,7 +78,7 @@ void writeNullRecord() { client.writeRecord(null); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @Test @@ -86,7 +87,7 @@ void writeNullPoint() { client.writePoint(null); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @Test @@ -95,11 +96,11 @@ void databaseParameter() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0"); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database"); } @Test @@ -108,11 +109,11 @@ void databaseParameterSpecified() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().database("my-database-2").build()); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database-2"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database-2"); } @Test @@ -126,7 +127,7 @@ void databaseParameterRequired() throws Exception { .hasMessage("Please specify the 'Database' as a method parameter or use " + "default configuration at 'ClientConfig.database'."); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @@ -136,11 +137,11 @@ void precisionParameter() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0"); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); } @Test @@ -149,11 +150,11 @@ void precisionParameterSpecified() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.S).build()); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("s"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("s"); } @Test @@ -162,12 +163,12 @@ void gzipParameter() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0"); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); - Assertions.assertThat(request.getHeader("Content-Encoding")).isNotEqualTo("gzip"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); + assertThat(request.getHeader("Content-Encoding")).isNotEqualTo("gzip"); } @Test @@ -176,63 +177,69 @@ void gzipParameterSpecified() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().gzipThreshold(1).build()); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); - Assertions.assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); + assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); } @Test - void writeNoSyncFalse_UsesV2API() throws InterruptedException { + void writeNoSyncFalseUsesV2API() throws InterruptedException { mockServer.enqueue(createResponse(200)); - client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build()); + client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build()); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v2/write"); - Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isNull(); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v2/write"); + assertThat(request.getRequestUrl().queryParameter("no_sync")).isNull(); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); } @Test - void writeNoSyncTrue_UsesV3API() throws InterruptedException { + void writeNoSyncTrueUsesV3API() throws InterruptedException { mockServer.enqueue(createResponse(200)); - client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build()); + client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build()); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("nanosecond"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("nanosecond"); } @Test - void writeNoSyncTrueOnV2Server_ThrowsException() throws InterruptedException { + void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); - InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, () -> - client.writeRecord("mem,tag=one value=1.0", new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, + () -> { + client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()); + } ); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - Assertions.assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("millisecond"); - - Assertions.assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - Assertions.assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true (supported by InfluxDB 3 Core/Enterprise servers only)."); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getRequestUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getRequestUrl().queryParameter("no_sync")).isEqualTo("true"); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("millisecond"); + + assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); + assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true" + + " (supported by InfluxDB 3 Core/Enterprise servers only)."); } @Test @@ -242,14 +249,14 @@ void allParameterSpecified() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0", new WriteOptions("your-database", WritePrecision.S, 1, false)); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getRequestUrl()).isNotNull(); - Assertions.assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); - Assertions.assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); - Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("s"); - Assertions.assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("your-database"); + assertThat(request).isNotNull(); + assertThat(request.getRequestUrl()).isNotNull(); + assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); + assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); + assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("s"); + assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("your-database"); } @Test @@ -258,11 +265,11 @@ void contentHeaders() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0"); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); - Assertions.assertThat(request.getHeader("Content-Encoding")).isNull(); + assertThat(request).isNotNull(); + assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); + assertThat(request.getHeader("Content-Encoding")).isNull(); } @Test @@ -271,10 +278,10 @@ void bodyRecord() throws InterruptedException { client.writeRecord("mem,tag=one value=1.0"); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0"); + assertThat(request).isNotNull(); + assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0"); } @Test @@ -287,10 +294,10 @@ void bodyPoint() throws InterruptedException { client.writePoint(point); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0"); + assertThat(request).isNotNull(); + assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0"); } @Test @@ -307,10 +314,10 @@ void bodyConcat() throws InterruptedException { client.writePoints(Arrays.asList(point1, point2)); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0\ncpu,tag=two value=2.0"); + assertThat(request).isNotNull(); + assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0\ncpu,tag=two value=2.0"); } @@ -328,11 +335,11 @@ void defaultTags() throws InterruptedException { client.writePoint(point, options); - Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); - Assertions.assertThat(request).isNotNull(); - Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,model=M5,tag=one,unit=U2 value=1.0"); + assertThat(request).isNotNull(); + assertThat(request.getBody().readUtf8()).isEqualTo("mem,model=M5,tag=one,unit=U2 value=1.0"); } @@ -350,16 +357,16 @@ public void retryHandled429Test() throws InterruptedException { Throwable thrown = catchThrowable(() -> client.writePoint(point)); - Assertions.assertThat(thrown).isNotNull(); - Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); + assertThat(thrown).isNotNull(); + assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); InfluxDBApiHttpException he = (InfluxDBApiHttpException) thrown; - Assertions.assertThat(he.headers()).isNotNull(); - Assertions.assertThat(he.getHeader("retry-after").get(0)) + assertThat(he.headers()).isNotNull(); + assertThat(he.getHeader("retry-after").get(0)) .isNotNull().isEqualTo("42"); - Assertions.assertThat(he.getHeader("content-type").get(0)) + assertThat(he.getHeader("content-type").get(0)) .isNotNull().isEqualTo("application/json"); - Assertions.assertThat(he.statusCode()).isEqualTo(429); - Assertions.assertThat(he.getMessage()) + assertThat(he.statusCode()).isEqualTo(429); + assertThat(he.getMessage()) .isEqualTo("HTTP status code: 429; Message: Too Many Requests"); } } diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 48b7d617..2e378ecc 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -27,10 +27,10 @@ import java.util.Map; import java.util.Properties; -import com.influxdb.v3.client.write.WriteOptions; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; class ClientConfigTest { @@ -132,7 +132,7 @@ void fromConnectionString() throws MalformedURLException { } @Test - void fromConnectionString_longPrecision() throws MalformedURLException { + void fromConnectionStringLongPrecision() throws MalformedURLException { ClientConfig cfg; cfg = new ClientConfig.Builder().build("http://localhost:9999/?token=x&precision=nanosecond"); Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); @@ -212,7 +212,7 @@ void fromEnv() { } @Test - void fromEnv_longPrecision() { + void fromEnvLongPrecision() { Map baseEnv = Map.of( "INFLUX_HOST", "http://localhost:9999/", "INFLUX_TOKEN", "my-token" @@ -306,7 +306,7 @@ void fromSystemProperties() { } @Test - void fromSystemProperties_longPrecision() throws MalformedURLException { + void fromSystemPropertiesLongPrecision() throws MalformedURLException { Properties baseProps = new Properties(); baseProps.put("influx.host", "http://localhost:9999/"); baseProps.put("influx.token", "my-token"); diff --git a/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java index 136b42fb..2ff15aca 100644 --- a/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java @@ -1,9 +1,9 @@ package com.influxdb.v3.client.write; -import org.junit.jupiter.api.Test; - import java.util.Map; +import org.junit.jupiter.api.Test; + import static org.junit.jupiter.api.Assertions.assertEquals; public class WritePrecisionConverterTest { From 8285b61a4ac9ae85f04fac779661d2d470ba1a1f Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Mon, 26 May 2025 10:45:23 +0200 Subject: [PATCH 05/13] lint: add missing license headers --- .../client/write/WritePrecisionConverter.java | 21 +++++++++++++++++++ .../write/WritePrecisionConverterTest.java | 21 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java index 89935a03..e5e8e2e5 100644 --- a/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java +++ b/src/main/java/com/influxdb/v3/client/write/WritePrecisionConverter.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.write; public final class WritePrecisionConverter { diff --git a/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java index 2ff15aca..1748cad5 100644 --- a/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WritePrecisionConverterTest.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.write; import java.util.Map; From f111cf49b4e6aab6ee37c453755ac23ea1eb92a4 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Mon, 26 May 2025 11:41:10 +0200 Subject: [PATCH 06/13] docs: update CHANGELOG.md --- CHANGELOG.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b59306..58fb5160 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ ## 1.2.0 [unreleased] +### Features + +1. [#238](https://github.com/InfluxCommunity/influxdb3-java/pull/238): Support fast writes without waiting for WAL + persistence: + - New write option (`WriteOptions.noSync`) added: `true` value means faster write but without the confirmation that + the data was persisted. Default value: `false`. + - **Supported by self-managed InfluxDB 3 Core and Enterprise servers only!** + - Also configurable via connection string query parameter (`writeNoSync`). + - Also configurable via environment variable (`INFLUX_WRITE_NO_SYNC`). + - Long precision string values added from v3 HTTP API: `"nanosecond"`, `"microsecond"`, `"millisecond"`, + `"second"` ( + in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`). + ## 1.1.0 [2025-05-22] ### Features From fc6e19c08076ee279d8c5dc50d942f2713aa02e3 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 28 May 2025 09:53:02 +0200 Subject: [PATCH 07/13] fix: use WriteOptions from ClientConfig on write --- .../client/internal/InfluxDBClientImpl.java | 10 +- .../v3/client/InfluxDBClientWriteTest.java | 172 +++++++++++++++++- 2 files changed, 173 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index cc0613a2..e1cb25f3 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -81,6 +81,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient { private final RestClient restClient; private final FlightSqlClient flightSqlClient; + private final WriteOptions emptyWriteOptions; /** * Creates an instance using the specified config. @@ -110,11 +111,12 @@ public InfluxDBClientImpl(@Nonnull final ClientConfig config) { this.config = config; this.restClient = restClient != null ? restClient : new RestClient(config); this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config); + this.emptyWriteOptions = new WriteOptions(null); } @Override public void writeRecord(@Nullable final String record) { - writeRecord(record, WriteOptions.DEFAULTS); + writeRecord(record, emptyWriteOptions); } @Override @@ -128,7 +130,7 @@ public void writeRecord(@Nullable final String record, @Nonnull final WriteOptio @Override public void writeRecords(@Nonnull final List records) { - writeRecords(records, WriteOptions.DEFAULTS); + writeRecords(records, emptyWriteOptions); } @Override @@ -138,7 +140,7 @@ public void writeRecords(@Nonnull final List records, @Nonnull final Wri @Override public void writePoint(@Nullable final Point point) { - writePoint(point, WriteOptions.DEFAULTS); + writePoint(point, emptyWriteOptions); } @Override @@ -152,7 +154,7 @@ public void writePoint(@Nullable final Point point, @Nonnull final WriteOptions @Override public void writePoints(@Nonnull final List points) { - writePoints(points, WriteOptions.DEFAULTS); + writePoints(points, emptyWriteOptions); } @Override diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 7d8488e7..246b4a7c 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -23,15 +23,19 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import io.netty.handler.codec.http.HttpResponseStatus; +import okhttp3.HttpUrl; import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -223,10 +227,8 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, - () -> { - client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()); - } + () -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) ); assertThat(mockServer.getRequestCount()).isEqualTo(1); @@ -242,6 +244,166 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { + " (supported by InfluxDB 3 Core/Enterprise servers only)."); } + @Test + void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecords(List.of("mem,tag=one value=1.0")); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecords(List.of("mem,tag=one value=1.0")); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoint(point); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoint(point); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + @Test + void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoints(List.of(point)); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + } + + @Test + void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writePrecision(WritePrecision.S) + .writeNoSync(true) + .gzipThreshold(1) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + Point point = new Point("mem"); + point.setTag("tag", "one"); + point.setField("value", 1.0); + client.writePoints(List.of(point)); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + } + + private void checkWriteCalled(String expectedPath, String expectedDB, String expectedPrecision, + boolean expectedNoSync, boolean expectedGzip) throws InterruptedException { + RecordedRequest request = assertThatServerRequested(); + HttpUrl requestUrl = request.getRequestUrl(); + assertThat(requestUrl).isNotNull(); + assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); + if (expectedNoSync) { + assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); + } else { + assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); + } + assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision); + if (expectedNoSync) { + assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true"); + } else { + assertThat(requestUrl.queryParameter("no_sync")).isNull(); + } + if (expectedGzip) { + assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); + } else { + assertThat(request.getHeader("Content-Encoding")).isNull(); + } + } + + @NotNull + private RecordedRequest assertThatServerRequested() throws InterruptedException { + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + return request; + } + @Test void allParameterSpecified() throws InterruptedException { mockServer.enqueue(createResponse(200)); @@ -344,7 +506,7 @@ void defaultTags() throws InterruptedException { } @Test - public void retryHandled429Test() throws InterruptedException { + public void retryHandled429Test() { mockServer.enqueue(createResponse(429) .setBody("{ \"message\" : \"Too Many Requests\" }") .setHeader("retry-after", "42") From 86cab270c98a030637aae2d8b26be576d1d07111 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 28 May 2025 09:56:09 +0200 Subject: [PATCH 08/13] lint: fix lint errors --- .../java/com/influxdb/v3/client/InfluxDBClientWriteTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 246b4a7c..45d5d8c4 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -372,8 +372,8 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); } - private void checkWriteCalled(String expectedPath, String expectedDB, String expectedPrecision, - boolean expectedNoSync, boolean expectedGzip) throws InterruptedException { + private void checkWriteCalled(final String expectedPath, final String expectedDB, final String expectedPrecision, + final boolean expectedNoSync, final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getRequestUrl(); assertThat(requestUrl).isNotNull(); From fa6d10a8f881a93f842e9750c234b0a340b82a8e Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 28 May 2025 10:09:13 +0200 Subject: [PATCH 09/13] lint: fix lint errors --- .../v3/client/InfluxDBClientWriteTest.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 45d5d8c4..3a4240e7 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -372,8 +372,9 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); } - private void checkWriteCalled(final String expectedPath, final String expectedDB, final String expectedPrecision, - final boolean expectedNoSync, final boolean expectedGzip) throws InterruptedException { + private void checkWriteCalled(final String expectedPath, final String expectedDB, + final String expectedPrecision, final boolean expectedNoSync, + final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getRequestUrl(); assertThat(requestUrl).isNotNull(); @@ -488,8 +489,8 @@ void defaultTags() throws InterruptedException { mockServer.enqueue(createResponse(200)); Point point = Point.measurement("mem") - .setTag("tag", "one") - .setField("value", 1.0); + .setTag("tag", "one") + .setField("value", 1.0); Map defaultTags = Map.of("unit", "U2", "model", "M5"); @@ -508,14 +509,14 @@ void defaultTags() throws InterruptedException { @Test public void retryHandled429Test() { mockServer.enqueue(createResponse(429) - .setBody("{ \"message\" : \"Too Many Requests\" }") - .setHeader("retry-after", "42") - .setHeader("content-type", "application/json") + .setBody("{ \"message\" : \"Too Many Requests\" }") + .setHeader("retry-after", "42") + .setHeader("content-type", "application/json") ); Point point = Point.measurement("mem") - .setTag("tag", "one") - .setField("value", 1.0); + .setTag("tag", "one") + .setField("value", 1.0); Throwable thrown = catchThrowable(() -> client.writePoint(point)); @@ -524,11 +525,11 @@ public void retryHandled429Test() { InfluxDBApiHttpException he = (InfluxDBApiHttpException) thrown; assertThat(he.headers()).isNotNull(); assertThat(he.getHeader("retry-after").get(0)) - .isNotNull().isEqualTo("42"); + .isNotNull().isEqualTo("42"); assertThat(he.getHeader("content-type").get(0)) - .isNotNull().isEqualTo("application/json"); + .isNotNull().isEqualTo("application/json"); assertThat(he.statusCode()).isEqualTo(429); assertThat(he.getMessage()) - .isEqualTo("HTTP status code: 429; Message: Too Many Requests"); + .isEqualTo("HTTP status code: 429; Message: Too Many Requests"); } } From 2764787633d56a5334f6d23de33c96155ddbfd66 Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 28 May 2025 10:20:10 +0200 Subject: [PATCH 10/13] docs: update CHANGELOG.md --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58fb5160..82cd3382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,17 @@ `"second"` ( in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`). +### Bug Fixes + +1. [#239](https://github.com/InfluxCommunity/influxdb3-java/pull/239): Use write options from `ClientConfig` in + `InfluxDBClientImpl` write methods: + ```java + public void writeRecord(@Nullable final String record); + public void writeRecords(@Nonnull final List records); + public void writePoint(@Nullable final Point point); + public void writePoints(@Nonnull final List points); + ``` + ## 1.1.0 [2025-05-22] ### Features From 9204c8a719812212923485ca605601a9c0f55a7c Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Tue, 3 Jun 2025 23:56:57 +0200 Subject: [PATCH 11/13] chore: revert unwanted whitespace changes --- .../v3/client/InfluxDBClientWriteTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 3a4240e7..49e513da 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -489,8 +489,8 @@ void defaultTags() throws InterruptedException { mockServer.enqueue(createResponse(200)); Point point = Point.measurement("mem") - .setTag("tag", "one") - .setField("value", 1.0); + .setTag("tag", "one") + .setField("value", 1.0); Map defaultTags = Map.of("unit", "U2", "model", "M5"); @@ -509,14 +509,14 @@ void defaultTags() throws InterruptedException { @Test public void retryHandled429Test() { mockServer.enqueue(createResponse(429) - .setBody("{ \"message\" : \"Too Many Requests\" }") - .setHeader("retry-after", "42") - .setHeader("content-type", "application/json") + .setBody("{ \"message\" : \"Too Many Requests\" }") + .setHeader("retry-after", "42") + .setHeader("content-type", "application/json") ); Point point = Point.measurement("mem") - .setTag("tag", "one") - .setField("value", 1.0); + .setTag("tag", "one") + .setField("value", 1.0); Throwable thrown = catchThrowable(() -> client.writePoint(point)); @@ -525,11 +525,11 @@ public void retryHandled429Test() { InfluxDBApiHttpException he = (InfluxDBApiHttpException) thrown; assertThat(he.headers()).isNotNull(); assertThat(he.getHeader("retry-after").get(0)) - .isNotNull().isEqualTo("42"); + .isNotNull().isEqualTo("42"); assertThat(he.getHeader("content-type").get(0)) - .isNotNull().isEqualTo("application/json"); + .isNotNull().isEqualTo("application/json"); assertThat(he.statusCode()).isEqualTo(429); assertThat(he.getMessage()) - .isEqualTo("HTTP status code: 429; Message: Too Many Requests"); + .isEqualTo("HTTP status code: 429; Message: Too Many Requests"); } } From bc4a804853e9e75dc2a5cecfb2fffe0b7a86cdaf Mon Sep 17 00:00:00 2001 From: Jan Simon Date: Wed, 4 Jun 2025 00:02:15 +0200 Subject: [PATCH 12/13] lint: fix lint errors --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb022ab7..358a7bcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ 1. [#239](https://github.com/InfluxCommunity/influxdb3-java/pull/239): Use write options from `ClientConfig` in `InfluxDBClientImpl` write methods: + ```java public void writeRecord(@Nullable final String record); public void writeRecords(@Nonnull final List records); From 83e3fddbfec3bab12b748b3d502b63986cc6873d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 4 Jun 2025 13:50:34 +0200 Subject: [PATCH 13/13] test: add skip new integration test if ENVARS are not defined. --- src/test/java/com/influxdb/v3/client/integration/E2ETest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index eb00e382..e4069233 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -278,6 +278,9 @@ public void testQueryRows() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test public void testQueryRowWithOptions() throws Exception { try (InfluxDBClient client = InfluxDBClient.getInstance(