Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@
`"second"` (
in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`).

### Bug Fixes

1. [#239](https://github.com/InfluxCommunity/influxdb3-java/pull/239): Use write options from `ClientConfig` in
`InfluxDBClientImpl` write methods:

```java
public void writeRecord(@Nullable final String record);
public void writeRecords(@Nonnull final List<String> records);
public void writePoint(@Nullable final Point point);
public void writePoints(@Nonnull final List<Point> points);
```

## 1.1.0 [2025-05-22]

### Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient {

private final RestClient restClient;
private final FlightSqlClient flightSqlClient;
private final WriteOptions emptyWriteOptions;

/**
* Creates an instance using the specified config.
Expand Down Expand Up @@ -110,11 +111,12 @@ public InfluxDBClientImpl(@Nonnull final ClientConfig config) {
this.config = config;
this.restClient = restClient != null ? restClient : new RestClient(config);
this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config);
this.emptyWriteOptions = new WriteOptions(null);
}

@Override
public void writeRecord(@Nullable final String record) {
writeRecord(record, WriteOptions.DEFAULTS);
writeRecord(record, emptyWriteOptions);
}

@Override
Expand All @@ -128,7 +130,7 @@ public void writeRecord(@Nullable final String record, @Nonnull final WriteOptio

@Override
public void writeRecords(@Nonnull final List<String> records) {
writeRecords(records, WriteOptions.DEFAULTS);
writeRecords(records, emptyWriteOptions);
}

@Override
Expand All @@ -138,7 +140,7 @@ public void writeRecords(@Nonnull final List<String> records, @Nonnull final Wri

@Override
public void writePoint(@Nullable final Point point) {
writePoint(point, WriteOptions.DEFAULTS);
writePoint(point, emptyWriteOptions);
}

@Override
Expand All @@ -152,7 +154,7 @@ public void writePoint(@Nullable final Point point, @Nonnull final WriteOptions

@Override
public void writePoints(@Nonnull final List<Point> points) {
writePoints(points, WriteOptions.DEFAULTS);
writePoints(points, emptyWriteOptions);
}

@Override
Expand Down
173 changes: 168 additions & 5 deletions src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.netty.handler.codec.http.HttpResponseStatus;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.RecordedRequest;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.write.WriteOptions;
import com.influxdb.v3.client.write.WritePrecision;

Expand Down Expand Up @@ -223,10 +227,8 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code()));

InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class,
() -> {
client.writeRecord("mem,tag=one value=1.0",
new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build());
}
() -> client.writeRecord("mem,tag=one value=1.0",
new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build())
);

assertThat(mockServer.getRequestCount()).isEqualTo(1);
Expand All @@ -242,6 +244,167 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
+ " (supported by InfluxDB 3 Core/Enterprise servers only).");
}

@Test
void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
client.writeRecord("mem,tag=one value=1.0");
}

checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
}

@Test
void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.writePrecision(WritePrecision.S)
.writeNoSync(true)
.gzipThreshold(1)
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
client.writeRecord("mem,tag=one value=1.0");
}

checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
}

@Test
void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}

checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
}

@Test
void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.writePrecision(WritePrecision.S)
.writeNoSync(true)
.gzipThreshold(1)
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}

checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
}

@Test
void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
Point point = new Point("mem");
point.setTag("tag", "one");
point.setField("value", 1.0);
client.writePoint(point);
}

checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
}

@Test
void writePointWithDefaultWriteOptionsCustomConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.writePrecision(WritePrecision.S)
.writeNoSync(true)
.gzipThreshold(1)
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
Point point = new Point("mem");
point.setTag("tag", "one");
point.setField("value", 1.0);
client.writePoint(point);
}

checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
}

@Test
void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
Point point = new Point("mem");
point.setTag("tag", "one");
point.setField("value", 1.0);
client.writePoints(List.of(point));
}

checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
}

@Test
void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception {
mockServer.enqueue(createResponse(200));

ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
.writePrecision(WritePrecision.S)
.writeNoSync(true)
.gzipThreshold(1)
.build();
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
Point point = new Point("mem");
point.setTag("tag", "one");
point.setField("value", 1.0);
client.writePoints(List.of(point));
}

checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
}

private void checkWriteCalled(final String expectedPath, final String expectedDB,
final String expectedPrecision, final boolean expectedNoSync,
final boolean expectedGzip) throws InterruptedException {
RecordedRequest request = assertThatServerRequested();
HttpUrl requestUrl = request.getRequestUrl();
assertThat(requestUrl).isNotNull();
assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath);
if (expectedNoSync) {
assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB);
} else {
assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB);
}
assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision);
if (expectedNoSync) {
assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true");
} else {
assertThat(requestUrl.queryParameter("no_sync")).isNull();
}
if (expectedGzip) {
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
} else {
assertThat(request.getHeader("Content-Encoding")).isNull();
}
}

@NotNull
private RecordedRequest assertThatServerRequested() throws InterruptedException {
assertThat(mockServer.getRequestCount()).isEqualTo(1);
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
return request;
}

@Test
void allParameterSpecified() throws InterruptedException {
mockServer.enqueue(createResponse(200));
Expand Down Expand Up @@ -344,7 +507,7 @@ void defaultTags() throws InterruptedException {
}

@Test
public void retryHandled429Test() throws InterruptedException {
public void retryHandled429Test() {
mockServer.enqueue(createResponse(429)
.setBody("{ \"message\" : \"Too Many Requests\" }")
.setHeader("retry-after", "42")
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/com/influxdb/v3/client/integration/E2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ public void testQueryRows() throws Exception {
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testQueryRowWithOptions() throws Exception {
try (InfluxDBClient client = InfluxDBClient.getInstance(
Expand Down
Loading