From ea3f7c6e62f324c08a70e5a07adb40072a17b718 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 28 Mar 2025 15:06:43 +0700 Subject: [PATCH 01/23] feat: set max rpc message size --- .../v3/client/config/ClientConfig.java | 37 +++++- .../v3/client/internal/FlightSqlClient.java | 15 ++- .../v3/client/config/ClientConfigTest.java | 107 +++++++++++++++++- 3 files changed, 147 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index fc248e30..f8676c8e 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -57,10 +57,11 @@ *
  • disableServerCertificateValidation - * disable server certificate validation for HTTPS connections *
  • - *
  • proxyUrl - Proxy url for query api and write api
  • + *
  • proxyUrl - proxy url for query api and write api
  • *
  • authenticator - HTTP proxy authenticator
  • *
  • headers - headers to be added to requests
  • - *
  • sslRootsFilePath - Path to the stored certificates file in PEM format
  • + *
  • sslRootsFilePath - path to the stored certificates file in PEM format
  • + *
  • maxInboundMessageSize - RPC maximum inbound message size that the client can receive
  • * *

    * If you want to create a client with custom configuration, you can use following code: @@ -102,6 +103,7 @@ public final class ClientConfig { private final Authenticator authenticator; private final Map headers; private final String sslRootsFilePath; + private final Integer maxInboundMessageSize; /** * Deprecated use {@link #proxyUrl}. @@ -240,6 +242,16 @@ public String getProxyUrl() { return proxyUrl; } + /** + * Gets rpc max message size client can receive. + * + * @return the size in Integer, may be null + */ + @Nullable + public Integer getMaxInboundMessageSize() { + return maxInboundMessageSize; + } + /** * Gets certificates file path. * @@ -303,7 +315,8 @@ public boolean equals(final Object o) { && Objects.equals(proxyUrl, that.proxyUrl) && Objects.equals(authenticator, that.authenticator) && Objects.equals(headers, that.headers) - && Objects.equals(sslRootsFilePath, that.sslRootsFilePath); + && Objects.equals(sslRootsFilePath, that.sslRootsFilePath) + && Objects.equals(maxInboundMessageSize, that.maxInboundMessageSize); } @Override @@ -312,7 +325,7 @@ public int hashCode() { database, writePrecision, gzipThreshold, timeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, - defaultTags, sslRootsFilePath); + defaultTags, sslRootsFilePath, maxInboundMessageSize); } @Override @@ -332,6 +345,7 @@ public String toString() { .add("headers=" + headers) .add("defaultTags=" + defaultTags) .add("sslRootsFilePath=" + sslRootsFilePath) + .add("maxInboundMessageSize=" + maxInboundMessageSize) .toString(); } @@ -357,6 +371,7 @@ public static final class Builder { private Authenticator authenticator; private Map headers; private String sslRootsFilePath; + private Integer maxInboundMessageSize; /** * Sets the URL of the InfluxDB server. @@ -589,6 +604,19 @@ public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) { return this; } + /** + * Set rpc max message size client can receive. Default is 'null'. + * + * @param maxInboundMessageSize The size in Integer + * @return this + */ + @Nonnull + public Builder maxInboundMessageSize(@Nullable final Integer maxInboundMessageSize) { + + this.maxInboundMessageSize = maxInboundMessageSize; + return this; + } + /** * Build an instance of {@code ClientConfig}. * @@ -728,5 +756,6 @@ private ClientConfig(@Nonnull final Builder builder) { authenticator = builder.authenticator; headers = builder.headers; sslRootsFilePath = builder.sslRootsFilePath; + maxInboundMessageSize = builder.maxInboundMessageSize; } } diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index e1b37298..fcda3665 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -141,11 +141,10 @@ public void close() throws Exception { @Nonnull private FlightClient createFlightClient(@Nonnull final ClientConfig config) { - Location location = createLocation(config); + URI uri = createLocation(config).getUri(); + final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()); - final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forTarget(location.getUri().getHost()); - - if (LocationSchemes.GRPC_TLS.equals(location.getUri().getScheme())) { + if (LocationSchemes.GRPC_TLS.equals(uri.getScheme())) { nettyChannelBuilder.useTransportSecurity(); SslContext nettySslContext = createNettySslContext(config); @@ -163,9 +162,13 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { LOG.warn("proxy property in ClientConfig will not work in query api, use proxyUrl property instead"); } + int maxInboundMessageSize = config.getMaxInboundMessageSize() != null + ? + config.getMaxInboundMessageSize() + : Integer.MAX_VALUE; nettyChannelBuilder.maxTraceEvents(0) - .maxInboundMessageSize(Integer.MAX_VALUE) - .maxInboundMetadataSize(Integer.MAX_VALUE); + .maxInboundMetadataSize(Integer.MAX_VALUE) + .maxInboundMessageSize(maxInboundMessageSize); return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build()); } diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 319d335c..2732a927 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -22,14 +22,36 @@ package com.influxdb.v3.client.config; import java.net.MalformedURLException; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.write.WritePrecision; class ClientConfigTest { @@ -63,9 +85,9 @@ void hashConfig() { Assertions.assertThat(config.hashCode()).isEqualTo(configBuilder.build().hashCode()); Assertions.assertThat(config.hashCode()) - .isNotEqualTo(configBuilder.database("database").build().hashCode()); + .isNotEqualTo(configBuilder.database("database").build().hashCode()); Assertions.assertThat(config.hashCode()) - .isNotEqualTo(configBuilder.defaultTags(defaultTags).build().hashCode()); + .isNotEqualTo(configBuilder.defaultTags(defaultTags).build().hashCode()); } @Test @@ -251,4 +273,85 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); } + + @Test + void maxInboundMessageSize() throws Exception { + URI uri = URI.create("http://127.0.0.1:33333"); + int rowCount = 100; + try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + // Set very small message size for testing + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig.Builder builder = new ClientConfig.Builder() + .host(host) + .database("test") + .maxInboundMessageSize(200); + String query = "Select * from \"nothing\""; + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { + try (Stream points = influxDBClient.queryPoints(query)) { + FlightRuntimeException exception = Assertions.catchThrowableOfType( + FlightRuntimeException.class, + points::count); + Assertions.assertThat(exception.status().code()).isEqualTo(CallStatus.RESOURCE_EXHAUSTED.code()); + } + } + + // Set large message size case + builder.maxInboundMessageSize(1024 * 1024 * 1024); + try (InfluxDBClient influxDBClient1 = InfluxDBClient.getInstance(builder.build())) { + Assertions.assertThatNoException().isThrownBy(() -> { + try (Stream points = influxDBClient1.queryPoints(query)) { + Assertions.assertThat(points.count()).isEqualTo(rowCount); + } + }); + } + } + } + + private FlightServer simpleFlightServer(@Nonnull final URI uri, + @Nonnull final BufferAllocator allocator, + @Nonnull final NoOpFlightProducer producer) throws Exception { + Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort()); + return FlightServer.builder(allocator, location, producer).build(); + } + + private NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) { + return new NoOpFlightProducer() { + @Override + public void getStream(final CallContext context, + final Ticket ticket, + final ServerStreamListener listener) { + listener.start(vectorSchemaRoot); + if (listener.isReady()) { + listener.putNext(); + } + listener.completed(); + } + }; + } + + private VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) { + List fields = new ArrayList<>(); + for (int i = 0; i < fieldCount; i++) { + Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null); + fields.add(field); + } + + Schema schema = new Schema(fields); + VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); + for (Field field : fields) { + VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field); + vector.allocateNew(rowCount); + for (int i = 0; i < rowCount; i++) { + vector.set(i, "Value".getBytes(StandardCharsets.UTF_8)); + } + vectorSchemaRoot.setRowCount(rowCount); + } + + return vectorSchemaRoot; + } } From 733f29c827616db3c6c65c94306fdeadd0e75d00 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 28 Mar 2025 15:09:39 +0700 Subject: [PATCH 02/23] chore: CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2312896a..3c979fdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ ### Features 1. [#229](https://github.com/InfluxCommunity/influxdb3-java/pull/229): Support proxy and custom ssl root certificates -2. [#233](https://github.com/InfluxCommunity/influxdb3-java/pull/233): More detailed documentation about timestamp handling for query and write functions +2. [#232](https://github.com/InfluxCommunity/influxdb3-java/pull/232): Allow set rpc max message size through maxInboundMessageSize in ClientConfig +3. [#233](https://github.com/InfluxCommunity/influxdb3-java/pull/233): More detailed documentation about timestamp handling for query and write functions ## 1.0.0 [2024-12-11] From 888b223b8c22387b5cf5015864a57662cceae660 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 28 Mar 2025 15:17:48 +0700 Subject: [PATCH 03/23] refactor: refactor code --- .../java/com/influxdb/v3/client/config/ClientConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 2732a927..728207a1 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -349,8 +349,8 @@ private VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final in for (int i = 0; i < rowCount; i++) { vector.set(i, "Value".getBytes(StandardCharsets.UTF_8)); } - vectorSchemaRoot.setRowCount(rowCount); } + vectorSchemaRoot.setRowCount(rowCount); return vectorSchemaRoot; } From 397bce612f31acb62c7efd3a90922a7ed7569ce1 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 1 Apr 2025 18:20:06 +0700 Subject: [PATCH 04/23] feat: set grpc options --- .../v3/client/config/ClientConfig.java | 33 +-- .../v3/client/internal/FlightSqlClient.java | 27 +- .../v3/client/internal/GrpcCallOption.java | 278 ++++++++++++++++++ .../client/internal/InfluxDBClientImpl.java | 5 +- .../v3/client/query/QueryOptions.java | 11 + .../v3/client/config/ClientConfigTest.java | 103 ------- .../client/internal/FlightSqlClientTest.java | 38 +++ .../v3/client/query/QueryOptionsTest.java | 169 +++++++++++ 8 files changed, 521 insertions(+), 143 deletions(-) create mode 100644 src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index f8676c8e..ad6a9d4f 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -61,7 +61,6 @@ *

  • authenticator - HTTP proxy authenticator
  • *
  • headers - headers to be added to requests
  • *
  • sslRootsFilePath - path to the stored certificates file in PEM format
  • - *
  • maxInboundMessageSize - RPC maximum inbound message size that the client can receive
  • * *

    * If you want to create a client with custom configuration, you can use following code: @@ -103,7 +102,6 @@ public final class ClientConfig { private final Authenticator authenticator; private final Map headers; private final String sslRootsFilePath; - private final Integer maxInboundMessageSize; /** * Deprecated use {@link #proxyUrl}. @@ -242,16 +240,6 @@ public String getProxyUrl() { return proxyUrl; } - /** - * Gets rpc max message size client can receive. - * - * @return the size in Integer, may be null - */ - @Nullable - public Integer getMaxInboundMessageSize() { - return maxInboundMessageSize; - } - /** * Gets certificates file path. * @@ -315,8 +303,7 @@ public boolean equals(final Object o) { && Objects.equals(proxyUrl, that.proxyUrl) && Objects.equals(authenticator, that.authenticator) && Objects.equals(headers, that.headers) - && Objects.equals(sslRootsFilePath, that.sslRootsFilePath) - && Objects.equals(maxInboundMessageSize, that.maxInboundMessageSize); + && Objects.equals(sslRootsFilePath, that.sslRootsFilePath); } @Override @@ -325,7 +312,7 @@ public int hashCode() { database, writePrecision, gzipThreshold, timeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, - defaultTags, sslRootsFilePath, maxInboundMessageSize); + defaultTags, sslRootsFilePath); } @Override @@ -345,7 +332,6 @@ public String toString() { .add("headers=" + headers) .add("defaultTags=" + defaultTags) .add("sslRootsFilePath=" + sslRootsFilePath) - .add("maxInboundMessageSize=" + maxInboundMessageSize) .toString(); } @@ -371,7 +357,6 @@ public static final class Builder { private Authenticator authenticator; private Map headers; private String sslRootsFilePath; - private Integer maxInboundMessageSize; /** * Sets the URL of the InfluxDB server. @@ -604,19 +589,6 @@ public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) { return this; } - /** - * Set rpc max message size client can receive. Default is 'null'. - * - * @param maxInboundMessageSize The size in Integer - * @return this - */ - @Nonnull - public Builder maxInboundMessageSize(@Nullable final Integer maxInboundMessageSize) { - - this.maxInboundMessageSize = maxInboundMessageSize; - return this; - } - /** * Build an instance of {@code ClientConfig}. * @@ -756,6 +728,5 @@ private ClientConfig(@Nonnull final Builder builder) { authenticator = builder.authenticator; headers = builder.headers; sslRootsFilePath = builder.sslRootsFilePath; - maxInboundMessageSize = builder.maxInboundMessageSize; } } diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index fcda3665..ca5bb738 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -27,11 +27,13 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; import java.util.stream.Stream; @@ -49,6 +51,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightGrpcUtils; import org.apache.arrow.flight.FlightStream; @@ -106,7 +109,8 @@ Stream execute(@Nonnull final String query, @Nonnull final String database, @Nonnull final QueryType queryType, @Nonnull final Map queryParameters, - @Nonnull final Map headers) { + @Nonnull final Map headers, + final CallOption... callOption) { Map ticketData = new HashMap<>() {{ put("database", database); @@ -126,8 +130,10 @@ Stream execute(@Nonnull final String query, } HeaderCallOption headerCallOption = metadataHeader(headers); + CallOption[] callOptions = concatCallOptions(callOption, headerCallOption); + Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - FlightStream stream = client.getStream(ticket, headerCallOption); + FlightStream stream = client.getStream(ticket, callOptions); FlightSqlIterator iterator = new FlightSqlIterator(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); @@ -162,13 +168,8 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { LOG.warn("proxy property in ClientConfig will not work in query api, use proxyUrl property instead"); } - int maxInboundMessageSize = config.getMaxInboundMessageSize() != null - ? - config.getMaxInboundMessageSize() - : Integer.MAX_VALUE; nettyChannelBuilder.maxTraceEvents(0) - .maxInboundMetadataSize(Integer.MAX_VALUE) - .maxInboundMessageSize(maxInboundMessageSize); + .maxInboundMetadataSize(Integer.MAX_VALUE); return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build()); } @@ -236,6 +237,16 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina }; } + @Nullable + CallOption[] concatCallOptions(@Nullable final CallOption[] base, final CallOption... callOption) { + if (base == null || base.length == 0) { + return callOption; + } + List results = new ArrayList<>(List.of(base)); + Arrays.stream(callOption).filter(Objects::nonNull).forEach(results::add); + return results.toArray(new CallOption[0]); + } + private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java new file mode 100644 index 00000000..9d62c52f --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -0,0 +1,278 @@ +package com.influxdb.v3.client.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.grpc.CompressorRegistry; +import io.grpc.Deadline; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.AbstractStub; +import org.apache.arrow.flight.CallOption; + +/** + * The collection of runtime options for a new RPC call. + */ +public final class GrpcCallOption { + + private final Deadline deadLineAfter; + private final Executor executor; + private final String compressorName; + private final Boolean waitForReady; + private final Integer maxInboundMessageSize; + private final Integer maxOutboundMessageSize; + private final CallOption[] callOptionCallback; + + private GrpcCallOption(@Nonnull final Builder builder) { + this.deadLineAfter = builder.deadLineAfter; + this.executor = builder.executor; + this.compressorName = builder.compressorName; + this.waitForReady = builder.waitForReady; + this.maxInboundMessageSize = builder.maxInboundMessageSize; + this.maxOutboundMessageSize = builder.maxOutboundMessageSize; + this.callOptionCallback = builder.callOptions.toArray(new CallOption[0]); + } + + /** + * Returns the deadline that is after the given duration from now. + * + * @return the Deadline object + */ + @Nullable + public Deadline getDeadlineAfter() { + return deadLineAfter; + } + + /** + * Returns the Executor to be used instead of default. + * + * @return the Executor + */ + @Nullable + public Executor getExecutor() { + return executor; + } + + /** + * Returns the compressor's name. + * + * @return the compressor's name + */ + @Nullable + public String getCompressorName() { + return compressorName; + } + + /** + * Returns the wait for ready flag + * + * @return the wait for ready flag + */ + @Nullable + public Boolean getWaitForReady() { + return waitForReady; + } + + /** + * Returns the maximum allowed message size acceptable from the remote peer. + * + * @return the maximum message size receive allowed + */ + @Nullable + public Integer getMaxInboundMessageSize() { + return maxInboundMessageSize; + } + + /** + * Returns the maximum allowed message size acceptable to send the remote peer. + * + * @return the maximum message size send allowed + */ + @Nullable + public Integer getMaxOutboundMessageSize() { + return maxOutboundMessageSize; + } + + /** + * Get the CallOption callback list which is use when setting + * the grpc CallOption. + * + * @return the CallOption list + */ + @Nonnull + public CallOption[] getCallOptionCallback() { + return callOptionCallback; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + GrpcCallOption that = (GrpcCallOption) o; + return Objects.equals(deadLineAfter, that.deadLineAfter) + && Objects.equals(executor, that.executor) + && Objects.equals(compressorName, that.compressorName) + && Objects.equals(waitForReady, that.waitForReady) + && Objects.equals(maxInboundMessageSize, that.maxInboundMessageSize) + && Objects.equals(maxOutboundMessageSize, that.maxOutboundMessageSize); + } + + @Override + public int hashCode() { + return Objects.hash(deadLineAfter, + executor, + compressorName, + waitForReady, + maxInboundMessageSize, + maxOutboundMessageSize + ); + } + + @Override + public String toString() { + return "GrpcCallOption{" + + "deadLineAfter=" + deadLineAfter + + ", executor=" + executor + + ", compressorName='" + compressorName + '\'' + + ", waitForReady=" + waitForReady + + ", maxInboundMessageSize=" + maxInboundMessageSize + + ", maxOutboundMessageSize=" + maxOutboundMessageSize + + '}'; + } + + public static final class Builder { + private Deadline deadLineAfter; + private Executor executor; + private String compressorName; + private Boolean waitForReady; + private Integer maxInboundMessageSize; + private Integer maxOutboundMessageSize; + private final List callOptions = new ArrayList<>(); + + /** + * Sets a deadline that is after the given {@code duration} from + * now. + * @param duration The duration + * @param timeUnit The time unit + * @return this + */ + public Builder withDeadlineAfter(final long duration, @Nonnull final TimeUnit timeUnit) { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withDeadlineAfter(duration, timeUnit); + } + }; + this.deadLineAfter = Deadline.after(duration, timeUnit); + callOptions.add(callOption); + return this; + } + + /** + * Sets an {@code executor} to be used instead of the default + * executor specified with {@link ManagedChannelBuilder#executor}. + * @param executor The executor + * @return this + */ + public Builder withExecutor(@Nonnull final Executor executor) { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withExecutor(executor); + } + }; + this.executor = executor; + callOptions.add(callOption); + return this; + } + + /** + * Sets the compression to use for the call. The compressor must be a valid name known in the + * {@link CompressorRegistry}. By default, the "gzip" compressor will be available. + * + *

    It is only safe to call this if the server supports the compression format chosen. There is + * no negotiation performed; if the server does not support the compression chosen, the call will + * fail. + * @param compressorName The compressor name + * @return this + */ + public Builder withCompressorName(@Nonnull final String compressorName) { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withCompression(compressorName); + } + }; + this.compressorName = compressorName; + callOptions.add(callOption); + return this; + } + + /** + * Enables + * 'wait for ready' for the call. Wait-for-ready queues the RPC until a connection is + * available. This may dramatically increase the latency of the RPC, but avoids failing + * "unnecessarily." The default queues the RPC until an attempt to connect has completed, but + * fails RPCs without sending them if unable to connect. + * @return this + */ + public Builder withWaitForReady() { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withWaitForReady(); + } + }; + this.waitForReady = true; + callOptions.add(callOption); + return this; + } + + /** + * Sets the maximum allowed message size acceptable from the remote peer. If unset, this will + * default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}. + * @param maxInboundMessageSize The max receive message size + * @return this + */ + public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessageSize) { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withMaxInboundMessageSize(maxInboundMessageSize); + } + }; + this.maxInboundMessageSize = maxInboundMessageSize; + callOptions.add(callOption); + return this; + } + + /** + * Sets the maximum allowed message size acceptable sent to the remote peer. + * @param maxOutboundMessageSize The maximum message send size + * @return this + */ + public Builder withMaxOutboundMessageSize(@Nonnull final Integer maxOutboundMessageSize) { + var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withMaxOutboundMessageSize(maxOutboundMessageSize); + } + }; + this.maxOutboundMessageSize = maxOutboundMessageSize; + callOptions.add(callOption); + return this; + } + + /** + * Build an instance of GrpcCallOption. + * + * @return the GrpcCallOption instance + */ + public GrpcCallOption build() { + return new GrpcCallOption(this); + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 6d67b7d0..1645a958 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import io.netty.handler.codec.http.HttpMethod; +import org.apache.arrow.flight.CallOption; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -341,7 +342,9 @@ private Stream queryData(@Nonnull final String query, } }); - return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe()); + GrpcCallOption grpcCallOption = options.grpcCallOption(); + CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptionCallback() : null; + return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe(), callOptions); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 8ddf88e8..ec15d62a 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -28,6 +28,7 @@ import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.Arguments; +import com.influxdb.v3.client.internal.GrpcCallOption; /** * Query API options. @@ -61,6 +62,7 @@ public final class QueryOptions { private final String database; private final QueryType queryType; private final Map headers; + private GrpcCallOption grpcCallOption; /** * Construct QueryAPI options. The query type is set to SQL. @@ -144,6 +146,15 @@ public Map headersSafe() { return headers; } + public void setGrpcCallOption(@Nonnull final GrpcCallOption grpcCallOption) { + this.grpcCallOption = grpcCallOption; + } + + @Nullable + public GrpcCallOption grpcCallOption() { + return grpcCallOption; + } + private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 728207a1..d2d772f4 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -22,36 +22,14 @@ package com.influxdb.v3.client.config; import java.net.MalformedURLException; -import java.net.URI; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Stream; -import javax.annotation.Nonnull; -import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.FlightRuntimeException; -import org.apache.arrow.flight.FlightServer; -import org.apache.arrow.flight.Location; -import org.apache.arrow.flight.NoOpFlightProducer; -import org.apache.arrow.flight.Ticket; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.FieldType; -import org.apache.arrow.vector.types.pojo.Schema; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import com.influxdb.v3.client.InfluxDBClient; -import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.write.WritePrecision; class ClientConfigTest { @@ -273,85 +251,4 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); } - - @Test - void maxInboundMessageSize() throws Exception { - URI uri = URI.create("http://127.0.0.1:33333"); - int rowCount = 100; - try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) - ) { - flightServer.start(); - - // Set very small message size for testing - String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); - ClientConfig.Builder builder = new ClientConfig.Builder() - .host(host) - .database("test") - .maxInboundMessageSize(200); - String query = "Select * from \"nothing\""; - try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { - try (Stream points = influxDBClient.queryPoints(query)) { - FlightRuntimeException exception = Assertions.catchThrowableOfType( - FlightRuntimeException.class, - points::count); - Assertions.assertThat(exception.status().code()).isEqualTo(CallStatus.RESOURCE_EXHAUSTED.code()); - } - } - - // Set large message size case - builder.maxInboundMessageSize(1024 * 1024 * 1024); - try (InfluxDBClient influxDBClient1 = InfluxDBClient.getInstance(builder.build())) { - Assertions.assertThatNoException().isThrownBy(() -> { - try (Stream points = influxDBClient1.queryPoints(query)) { - Assertions.assertThat(points.count()).isEqualTo(rowCount); - } - }); - } - } - } - - private FlightServer simpleFlightServer(@Nonnull final URI uri, - @Nonnull final BufferAllocator allocator, - @Nonnull final NoOpFlightProducer producer) throws Exception { - Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort()); - return FlightServer.builder(allocator, location, producer).build(); - } - - private NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) { - return new NoOpFlightProducer() { - @Override - public void getStream(final CallContext context, - final Ticket ticket, - final ServerStreamListener listener) { - listener.start(vectorSchemaRoot); - if (listener.isReady()) { - listener.putNext(); - } - listener.completed(); - } - }; - } - - private VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) { - List fields = new ArrayList<>(); - for (int i = 0; i < fieldCount; i++) { - Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null); - fields.add(field); - } - - Schema schema = new Schema(fields); - VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); - for (Field field : fields) { - VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field); - vector.allocateNew(rowCount); - for (int i = 0; i < rowCount; i++) { - vector.set(i, "Value".getBytes(StandardCharsets.UTF_8)); - } - } - vectorSchemaRoot.setRowCount(rowCount); - - return vectorSchemaRoot; - } } diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 84a609ad..c866fb90 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -26,16 +26,20 @@ import java.util.Map; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.Metadata; import io.grpc.ProxyDetector; import io.grpc.internal.GrpcUtil; import org.apache.arrow.flight.CallHeaders; import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightClientMiddleware; import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.HeaderCallOption; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.grpc.MetadataAdapter; import org.apache.arrow.memory.RootAllocator; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -325,6 +329,40 @@ void createProxyDetector() { } } + @Test + void concatCallOptions() throws Exception { + ClientConfig clientConfig = new ClientConfig.Builder() + .host("https://localhost:80") + .build(); + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + CallOption[] results = flightSqlClient.concatCallOptions(null, null); + Assertions.assertThat(results).isNull(); + }); + + MetadataAdapter metadata = new MetadataAdapter(new Metadata()); + metadata.insert("key1", "value1"); + HeaderCallOption headerCallOption = new HeaderCallOption(metadata); + + CallOption[] callOptions = flightSqlClient.concatCallOptions(null, headerCallOption); + Assertions.assertThat(callOptions).isNotNull(); + Assertions.assertThat(callOptions.length).isEqualTo(1); + + callOptions = flightSqlClient.concatCallOptions(new CallOption[]{headerCallOption}); + Assertions.assertThat(callOptions).isNotNull(); + Assertions.assertThat(callOptions.length).isEqualTo(1); + + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + .withMaxOutboundMessageSize(1) + .withCompressorName("gzip") + .build(); + + callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptionCallback(), headerCallOption); + Assertions.assertThat(callOptions).isNotNull(); + Assertions.assertThat(callOptions.length).isEqualTo(3); + } + } + static class CallHeadersMiddleware implements FlightClientMiddleware.Factory { CallHeaders headers; diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 3040ef70..5127f177 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -21,11 +21,38 @@ */ package com.influxdb.v3.client.query; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import javax.annotation.Nonnull; + +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallOptions; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.internal.GrpcCallOption; class QueryOptionsTest { @@ -73,4 +100,146 @@ void optionsOverrideQueryType() { Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(options.queryTypeSafe()).isEqualTo(QueryType.InfluxQL); } + + @Test + void setInboundMessageSizeSmall() throws Exception { + URI uri = URI.create("http://127.0.0.1:33333"); + int rowCount = 100; + try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig.Builder builder = new ClientConfig.Builder() + .host(host) + .database("test"); + + // Set very small message size for testing + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + .withMaxInboundMessageSize(200) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOption(grpcCallOption); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { + try (Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"", + queryOptions + )) { + try { + Assertions.assertThatThrownBy(stream::count); + } catch (FlightRuntimeException e) { + Assertions.assertThat(e.status().code()).isEqualTo(CallStatus.RESOURCE_EXHAUSTED.code()); + } + } + } + } + } + + @Test + void setInboundMessageSizeLarge() throws Exception { + URI uri = URI.create("http://127.0.0.1:33333"); + int rowCount = 100; + try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .build(); + + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOption(grpcCallOption); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + } + } + + @Test + void grpcCallOption() { + GrpcCallOption.Builder builder = new GrpcCallOption.Builder(); + builder.withMaxInboundMessageSize(1024); + builder.withMaxOutboundMessageSize(1024); + builder.withCompressorName("my-compressor"); + builder.withDeadlineAfter(2, TimeUnit.HOURS); + builder.withExecutor(Runnable::run); + builder.withWaitForReady(); + + GrpcCallOption callOption = builder.build(); + Assertions.assertThat(callOption.getMaxInboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(callOption.getMaxOutboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(callOption.getCompressorName()).isEqualTo("my-compressor"); + Assertions.assertThat(callOption.getDeadlineAfter()).isNotNull(); + Assertions.assertThat(callOption.getExecutor()).isNotNull(); + Assertions.assertThat(callOption.getWaitForReady()).isTrue(); + + CallOption[] callBackArray = callOption.getCallOptionCallback(); + Assertions.assertThat(callBackArray).isNotNull(); + Assertions.assertThat(callBackArray.length).isEqualTo(6); + for (CallOption option : callBackArray) { + Assertions.assertThat(option).isInstanceOf(CallOptions.GrpcCallOption.class); + } + } + + private FlightServer simpleFlightServer(@Nonnull final URI uri, + @Nonnull final BufferAllocator allocator, + @Nonnull final NoOpFlightProducer producer) throws Exception { + Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort()); + return FlightServer.builder(allocator, location, producer).build(); + } + + private NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) { + return new NoOpFlightProducer() { + @Override + public void getStream(final CallContext context, + final Ticket ticket, + final ServerStreamListener listener) { + listener.start(vectorSchemaRoot); + if (listener.isReady()) { + listener.putNext(); + } + listener.completed(); + } + }; + } + + private VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) { + List fields = new ArrayList<>(); + for (int i = 0; i < fieldCount; i++) { + Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null); + fields.add(field); + } + + Schema schema = new Schema(fields); + VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); + for (Field field : fields) { + VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field); + vector.allocateNew(rowCount); + for (int i = 0; i < rowCount; i++) { + vector.set(i, "Value".getBytes(StandardCharsets.UTF_8)); + } + } + vectorSchemaRoot.setRowCount(rowCount); + + return vectorSchemaRoot; + } } From ec08ca9d129d81def2df90c9493ecb5108a2dd5d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 08:06:47 +0700 Subject: [PATCH 05/23] chore: add license --- .../v3/client/internal/GrpcCallOption.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java index 9d62c52f..132038cb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.internal; import java.util.ArrayList; From 61c98a07b8ac63448f330ee31b8d0851c1bb487f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 08:17:52 +0700 Subject: [PATCH 06/23] fix: checkstyle --- .../v3/client/internal/GrpcCallOption.java | 37 ++++++++++--------- .../client/internal/InfluxDBClientImpl.java | 9 ++++- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java index 132038cb..d05c9754 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -89,7 +89,7 @@ public String getCompressorName() { } /** - * Returns the wait for ready flag + * Returns the wait for ready flag. * * @return the wait for ready flag */ @@ -130,8 +130,10 @@ public CallOption[] getCallOptionCallback() { } @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } GrpcCallOption that = (GrpcCallOption) o; return Objects.equals(deadLineAfter, that.deadLineAfter) && Objects.equals(executor, that.executor) @@ -154,14 +156,15 @@ public int hashCode() { @Override public String toString() { - return "GrpcCallOption{" + - "deadLineAfter=" + deadLineAfter + - ", executor=" + executor + - ", compressorName='" + compressorName + '\'' + - ", waitForReady=" + waitForReady + - ", maxInboundMessageSize=" + maxInboundMessageSize + - ", maxOutboundMessageSize=" + maxOutboundMessageSize + - '}'; + return "GrpcCallOption{" + + "deadLineAfter=" + deadLineAfter + + ", executor=" + executor + + ", compressorName='" + compressorName + + '\'' + + ", waitForReady=" + waitForReady + + ", maxInboundMessageSize=" + maxInboundMessageSize + + ", maxOutboundMessageSize=" + maxOutboundMessageSize + + '}'; } public static final class Builder { @@ -183,7 +186,7 @@ public static final class Builder { public Builder withDeadlineAfter(final long duration, @Nonnull final TimeUnit timeUnit) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withDeadlineAfter(duration, timeUnit); } }; @@ -201,7 +204,7 @@ public > T wrapStub(T stub) { public Builder withExecutor(@Nonnull final Executor executor) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withExecutor(executor); } }; @@ -223,7 +226,7 @@ public > T wrapStub(T stub) { public Builder withCompressorName(@Nonnull final String compressorName) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withCompression(compressorName); } }; @@ -243,7 +246,7 @@ public > T wrapStub(T stub) { public Builder withWaitForReady() { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withWaitForReady(); } }; @@ -261,7 +264,7 @@ public > T wrapStub(T stub) { public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessageSize) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withMaxInboundMessageSize(maxInboundMessageSize); } }; @@ -278,7 +281,7 @@ public > T wrapStub(T stub) { public Builder withMaxOutboundMessageSize(@Nonnull final Integer maxOutboundMessageSize) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withMaxOutboundMessageSize(maxOutboundMessageSize); } }; diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 1645a958..a2114043 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -344,7 +344,14 @@ private Stream queryData(@Nonnull final String query, GrpcCallOption grpcCallOption = options.grpcCallOption(); CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptionCallback() : null; - return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe(), callOptions); + return flightSqlClient.execute( + query, + database, + options.queryTypeSafe(), + parameters, + options.headersSafe(), + callOptions + ); } @Nonnull From f82f8be6963e71e8a267b4fb9d9c7a1ec7cc733f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 09:23:12 +0700 Subject: [PATCH 07/23] feat: improve test --- .../v3/client/query/QueryOptionsTest.java | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 5127f177..c63a3c2b 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -25,10 +25,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import javax.annotation.Nonnull; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallOptions; import org.apache.arrow.flight.CallStatus; @@ -37,6 +38,7 @@ import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.impl.FlightServiceGrpc; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VarCharVector; @@ -176,28 +178,25 @@ void setInboundMessageSizeLarge() throws Exception { @Test void grpcCallOption() { - GrpcCallOption.Builder builder = new GrpcCallOption.Builder(); - builder.withMaxInboundMessageSize(1024); - builder.withMaxOutboundMessageSize(1024); - builder.withCompressorName("my-compressor"); - builder.withDeadlineAfter(2, TimeUnit.HOURS); - builder.withExecutor(Runnable::run); - builder.withWaitForReady(); - - GrpcCallOption callOption = builder.build(); - Assertions.assertThat(callOption.getMaxInboundMessageSize()).isEqualTo(1024); - Assertions.assertThat(callOption.getMaxOutboundMessageSize()).isEqualTo(1024); - Assertions.assertThat(callOption.getCompressorName()).isEqualTo("my-compressor"); - Assertions.assertThat(callOption.getDeadlineAfter()).isNotNull(); - Assertions.assertThat(callOption.getExecutor()).isNotNull(); - Assertions.assertThat(callOption.getWaitForReady()).isTrue(); - - CallOption[] callBackArray = callOption.getCallOptionCallback(); - Assertions.assertThat(callBackArray).isNotNull(); - Assertions.assertThat(callBackArray.length).isEqualTo(6); - for (CallOption option : callBackArray) { - Assertions.assertThat(option).isInstanceOf(CallOptions.GrpcCallOption.class); + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withCompressorName("my-compressor") + .withWaitForReady() + .build(); + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) + .usePlaintext() + .build(); + FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); + for (CallOption option : grpcCallOption.getCallOptionCallback()) { + stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); } + + io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); + Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()).isEqualTo(grpcCallOption.getMaxInboundMessageSize()); + Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()).isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); + Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); + Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); } private FlightServer simpleFlightServer(@Nonnull final URI uri, From 44dd08de72afd145deaba44689a80e88abd51c72 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 09:44:47 +0700 Subject: [PATCH 08/23] feat: improve test --- .../v3/client/internal/GrpcCallOption.java | 31 +++++++++---------- .../v3/client/query/QueryOptionsTest.java | 16 ++++++++-- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java index d05c9754..0d8b342c 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -40,7 +39,7 @@ */ public final class GrpcCallOption { - private final Deadline deadLineAfter; + private final Deadline deadline; private final Executor executor; private final String compressorName; private final Boolean waitForReady; @@ -49,7 +48,7 @@ public final class GrpcCallOption { private final CallOption[] callOptionCallback; private GrpcCallOption(@Nonnull final Builder builder) { - this.deadLineAfter = builder.deadLineAfter; + this.deadline = builder.deadline; this.executor = builder.executor; this.compressorName = builder.compressorName; this.waitForReady = builder.waitForReady; @@ -59,13 +58,13 @@ private GrpcCallOption(@Nonnull final Builder builder) { } /** - * Returns the deadline that is after the given duration from now. + * Returns the absolute deadline for a call. * * @return the Deadline object */ @Nullable - public Deadline getDeadlineAfter() { - return deadLineAfter; + public Deadline getDeadline() { + return deadline; } /** @@ -135,7 +134,7 @@ public boolean equals(final Object o) { return false; } GrpcCallOption that = (GrpcCallOption) o; - return Objects.equals(deadLineAfter, that.deadLineAfter) + return Objects.equals(deadline, that.deadline) && Objects.equals(executor, that.executor) && Objects.equals(compressorName, that.compressorName) && Objects.equals(waitForReady, that.waitForReady) @@ -145,7 +144,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(deadLineAfter, + return Objects.hash(deadline, executor, compressorName, waitForReady, @@ -157,7 +156,7 @@ public int hashCode() { @Override public String toString() { return "GrpcCallOption{" - + "deadLineAfter=" + deadLineAfter + + "deadline=" + deadline + ", executor=" + executor + ", compressorName='" + compressorName + '\'' @@ -168,7 +167,7 @@ public String toString() { } public static final class Builder { - private Deadline deadLineAfter; + private Deadline deadline; private Executor executor; private String compressorName; private Boolean waitForReady; @@ -177,20 +176,18 @@ public static final class Builder { private final List callOptions = new ArrayList<>(); /** - * Sets a deadline that is after the given {@code duration} from - * now. - * @param duration The duration - * @param timeUnit The time unit + * Sets the absolute deadline for a rpc call. + * @param deadline The deadline * @return this */ - public Builder withDeadlineAfter(final long duration, @Nonnull final TimeUnit timeUnit) { + public Builder withDeadline(final @Nonnull Deadline deadline) { var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override public > T wrapStub(final T stub) { - return stub.withDeadlineAfter(duration, timeUnit); + return stub.withDeadline(deadline); } }; - this.deadLineAfter = Deadline.after(duration, timeUnit); + this.deadline = deadline; callOptions.add(callOption); return this; } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index c63a3c2b..ae6a187c 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -25,9 +25,13 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import javax.annotation.Nonnull; +import io.grpc.Deadline; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.arrow.flight.CallOption; @@ -178,11 +182,15 @@ void setInboundMessageSizeLarge() throws Exception { @Test void grpcCallOption() { + Executor executor = Executors.newSingleThreadExecutor(); + Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() .withMaxInboundMessageSize(1024) .withMaxOutboundMessageSize(1024) .withCompressorName("my-compressor") .withWaitForReady() + .withExecutor(executor) + .withDeadline(deadline) .build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) .usePlaintext() @@ -193,10 +201,14 @@ void grpcCallOption() { } io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); - Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()).isEqualTo(grpcCallOption.getMaxInboundMessageSize()); - Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()).isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); + Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()) + .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); + Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()) + .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); + Assertions.assertThat(stubCallOptions.getExecutor()).isEqualTo(grpcCallOption.getExecutor()); + Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); } private FlightServer simpleFlightServer(@Nonnull final URI uri, From d334630e74d046ae50acc44d0619cee7b727ea8f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:15:46 +0700 Subject: [PATCH 09/23] feat: default max message size --- .../v3/client/query/QueryOptions.java | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index ec15d62a..afbe7cd8 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -146,8 +146,51 @@ public Map headersSafe() { return headers; } + /** + * The grpcCallOption pass to this function will be merged with the default grpcCallOption. + * @param: the grpcCallOption + */ public void setGrpcCallOption(@Nonnull final GrpcCallOption grpcCallOption) { - this.grpcCallOption = grpcCallOption; + GrpcCallOption.Builder builder = getDefaultGrpcCallOptsBuilder(grpcCallOption); + + if (grpcCallOption.getMaxOutboundMessageSize() != null) { + builder.withMaxOutboundMessageSize(grpcCallOption.getMaxOutboundMessageSize()); + } + + if (grpcCallOption.getExecutor() != null) { + builder.withExecutor(grpcCallOption.getExecutor()); + } + + if (grpcCallOption.getWaitForReady() != null) { + builder.withWaitForReady(); + } + + if (grpcCallOption.getDeadline() != null) { + builder.withDeadline(grpcCallOption.getDeadline()); + } + + if (grpcCallOption.getCompressorName() != null) { + builder.withCompressorName(grpcCallOption.getCompressorName()); + } + + this.grpcCallOption = builder.build(); + } + + /** + * @param grpcCallOption the grpcCallOption. + * @return the default grpc builder with some default options + */ + @Nonnull + private static GrpcCallOption.Builder getDefaultGrpcCallOptsBuilder(@Nonnull final GrpcCallOption grpcCallOption) { + GrpcCallOption.Builder builder = new GrpcCallOption.Builder(); + if (grpcCallOption.getMaxInboundMessageSize() != null) { + builder.withMaxInboundMessageSize(grpcCallOption.getMaxInboundMessageSize()); + } else { + // Set this for backward compatibility + builder.withMaxInboundMessageSize(Integer.MAX_VALUE); + } + + return builder; } @Nullable From 5fed3f5300ce7c0da437e3f4460973e72631bd50 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:25:12 +0700 Subject: [PATCH 10/23] chore: add more tests --- .../com/influxdb/v3/client/internal/GrpcCallOption.java | 3 +++ .../java/com/influxdb/v3/client/query/QueryOptions.java | 5 ++++- .../com/influxdb/v3/client/query/QueryOptionsTest.java | 7 +++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java index 0d8b342c..6f7ea805 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -166,6 +166,9 @@ public String toString() { + '}'; } + /** + * Builder for GrpcCallOption + */ public static final class Builder { private Deadline deadline; private Executor executor; diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index afbe7cd8..35e720c6 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -148,7 +148,7 @@ public Map headersSafe() { /** * The grpcCallOption pass to this function will be merged with the default grpcCallOption. - * @param: the grpcCallOption + * @param grpcCallOption the grpcCallOption */ public void setGrpcCallOption(@Nonnull final GrpcCallOption grpcCallOption) { GrpcCallOption.Builder builder = getDefaultGrpcCallOptsBuilder(grpcCallOption); @@ -193,6 +193,9 @@ private static GrpcCallOption.Builder getDefaultGrpcCallOptsBuilder(@Nonnull fin return builder; } + /** + * @return grpc call options with some default options + */ @Nullable public GrpcCallOption grpcCallOption() { return grpcCallOption; diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index ae6a187c..3d27901a 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -180,6 +180,13 @@ void setInboundMessageSizeLarge() throws Exception { } } + @Test + void grpcCallOptionDefaultOptions() { + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder().build(); + Assertions.assertThat(grpcCallOption.getMaxInboundMessageSize()) + .isEqualTo(Integer.MAX_VALUE); + } + @Test void grpcCallOption() { Executor executor = Executors.newSingleThreadExecutor(); From 368ff638ce2181a5d8fa6c37571c0e70d1308f48 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:31:51 +0700 Subject: [PATCH 11/23] chore: add more tests --- .../com/influxdb/v3/client/query/QueryOptionsTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 3d27901a..4e77de02 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -182,9 +182,11 @@ void setInboundMessageSizeLarge() throws Exception { @Test void grpcCallOptionDefaultOptions() { - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder().build(); - Assertions.assertThat(grpcCallOption.getMaxInboundMessageSize()) - .isEqualTo(Integer.MAX_VALUE); + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOption(new GrpcCallOption.Builder().build()); + Assertions.assertThat(queryOptions.grpcCallOption()).isNotNull(); + Assertions.assertThat(queryOptions.grpcCallOption() + .getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); } @Test From 0266c6a843c30bb0783b6cdf2834304896bff29d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:34:38 +0700 Subject: [PATCH 12/23] fix: check style --- .../java/com/influxdb/v3/client/internal/GrpcCallOption.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java index 6f7ea805..5dc9b1e8 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java @@ -167,7 +167,7 @@ public String toString() { } /** - * Builder for GrpcCallOption + * Builder for GrpcCallOption. */ public static final class Builder { private Deadline deadline; From 3d898ff5fd1322646dfcba806e2c252443973708 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:48:22 +0700 Subject: [PATCH 13/23] chore: add tests --- .../v3/client/query/QueryOptionsTest.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 4e77de02..9e9f31a1 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -180,13 +180,39 @@ void setInboundMessageSizeLarge() throws Exception { } } + @Test + void setGrpcCallOption() { + Executor executor = Executors.newSingleThreadExecutor(); + Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); + String compressorName = "name"; + + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder().withExecutor(executor) + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withWaitForReady() + .withDeadline(deadline) + .withCompressorName(compressorName) + .build(); + + QueryOptions options = new QueryOptions("test"); + options.setGrpcCallOption(grpcCallOption); + Assertions.assertThat(options.grpcCallOption()).isNotNull(); + Assertions.assertThat(options.grpcCallOption().getMaxInboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOption().getMaxOutboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOption().getExecutor()).isEqualTo(executor); + Assertions.assertThat(options.grpcCallOption().getWaitForReady()).isTrue(); + Assertions.assertThat(options.grpcCallOption().getCompressorName()).isEqualTo(compressorName); + Assertions.assertThat(options.grpcCallOption().getDeadline()).isEqualTo(deadline); + + } + @Test void grpcCallOptionDefaultOptions() { QueryOptions queryOptions = new QueryOptions("test"); queryOptions.setGrpcCallOption(new GrpcCallOption.Builder().build()); Assertions.assertThat(queryOptions.grpcCallOption()).isNotNull(); Assertions.assertThat(queryOptions.grpcCallOption() - .getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + .getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); } @Test From 87d12987bc0312c91f9c5c11d3a0213e5383e821 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 2 Apr 2025 10:58:30 +0700 Subject: [PATCH 14/23] [EMPTY] trigger CI From 9498ff07c5be898a08134dec688d7e6c1565062a Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 3 Apr 2025 10:48:54 +0700 Subject: [PATCH 15/23] refactor: change class name --- .../v3/client/internal/FlightSqlClient.java | 6 +- ...pcCallOption.java => GrpcCallOptions.java} | 149 ++++++++++++------ .../client/internal/InfluxDBClientImpl.java | 4 +- .../v3/client/query/QueryOptions.java | 53 +------ .../client/internal/FlightSqlClientTest.java | 7 +- .../v3/client/query/QueryOptionsTest.java | 14 +- 6 files changed, 121 insertions(+), 112 deletions(-) rename src/main/java/com/influxdb/v3/client/internal/{GrpcCallOption.java => GrpcCallOptions.java} (75%) diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index ca5bb738..d3fbcea9 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -110,7 +110,7 @@ Stream execute(@Nonnull final String query, @Nonnull final QueryType queryType, @Nonnull final Map queryParameters, @Nonnull final Map headers, - final CallOption... callOption) { + final CallOption... callOptions) { Map ticketData = new HashMap<>() {{ put("database", database); @@ -130,10 +130,10 @@ Stream execute(@Nonnull final String query, } HeaderCallOption headerCallOption = metadataHeader(headers); - CallOption[] callOptions = concatCallOptions(callOption, headerCallOption); + CallOption[] callOptionArray = concatCallOptions(callOptions, headerCallOption); Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - FlightStream stream = client.getStream(ticket, callOptions); + FlightStream stream = client.getStream(ticket, callOptionArray); FlightSqlIterator iterator = new FlightSqlIterator(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java similarity index 75% rename from src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java rename to src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 5dc9b1e8..2d6ad36f 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOption.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -37,7 +37,7 @@ /** * The collection of runtime options for a new RPC call. */ -public final class GrpcCallOption { +public final class GrpcCallOptions { private final Deadline deadline; private final Executor executor; @@ -45,16 +45,16 @@ public final class GrpcCallOption { private final Boolean waitForReady; private final Integer maxInboundMessageSize; private final Integer maxOutboundMessageSize; - private final CallOption[] callOptionCallback; + private final CallOption[] callOptions; - private GrpcCallOption(@Nonnull final Builder builder) { + private GrpcCallOptions(@Nonnull final Builder builder) { this.deadline = builder.deadline; this.executor = builder.executor; this.compressorName = builder.compressorName; this.waitForReady = builder.waitForReady; this.maxInboundMessageSize = builder.maxInboundMessageSize; this.maxOutboundMessageSize = builder.maxOutboundMessageSize; - this.callOptionCallback = builder.callOptions.toArray(new CallOption[0]); + this.callOptions = builder.callOptions.toArray(new CallOption[0]); } /** @@ -124,8 +124,8 @@ public Integer getMaxOutboundMessageSize() { * @return the CallOption list */ @Nonnull - public CallOption[] getCallOptionCallback() { - return callOptionCallback; + public CallOption[] getCallOptions() { + return callOptions; } @Override @@ -133,7 +133,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - GrpcCallOption that = (GrpcCallOption) o; + GrpcCallOptions that = (GrpcCallOptions) o; return Objects.equals(deadline, that.deadline) && Objects.equals(executor, that.executor) && Objects.equals(compressorName, that.compressorName) @@ -174,7 +174,7 @@ public static final class Builder { private Executor executor; private String compressorName; private Boolean waitForReady; - private Integer maxInboundMessageSize; + private Integer maxInboundMessageSize = Integer.MAX_VALUE; private Integer maxOutboundMessageSize; private final List callOptions = new ArrayList<>(); @@ -184,14 +184,7 @@ public static final class Builder { * @return this */ public Builder withDeadline(final @Nonnull Deadline deadline) { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withDeadline(deadline); - } - }; this.deadline = deadline; - callOptions.add(callOption); return this; } @@ -202,14 +195,7 @@ public > T wrapStub(final T stub) { * @return this */ public Builder withExecutor(@Nonnull final Executor executor) { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withExecutor(executor); - } - }; this.executor = executor; - callOptions.add(callOption); return this; } @@ -224,14 +210,7 @@ public > T wrapStub(final T stub) { * @return this */ public Builder withCompressorName(@Nonnull final String compressorName) { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withCompression(compressorName); - } - }; this.compressorName = compressorName; - callOptions.add(callOption); return this; } @@ -244,14 +223,7 @@ public > T wrapStub(final T stub) { * @return this */ public Builder withWaitForReady() { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withWaitForReady(); - } - }; this.waitForReady = true; - callOptions.add(callOption); return this; } @@ -262,14 +234,7 @@ public > T wrapStub(final T stub) { * @return this */ public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessageSize) { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withMaxInboundMessageSize(maxInboundMessageSize); - } - }; this.maxInboundMessageSize = maxInboundMessageSize; - callOptions.add(callOption); return this; } @@ -279,24 +244,106 @@ public > T wrapStub(final T stub) { * @return this */ public Builder withMaxOutboundMessageSize(@Nonnull final Integer maxOutboundMessageSize) { - var callOption = new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + this.maxOutboundMessageSize = maxOutboundMessageSize; + return this; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createDeadlineCallOption( + final Deadline deadline) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withDeadline(deadline); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createExecutorCallOption( + final Executor executor) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withExecutor(executor); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createCompressionCallOption( + final String compressorName) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withCompression(compressorName); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createWaitForReadyCallOption() { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withWaitForReady(); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createMaxInboundMessageSizeCallOption( + final Integer maxInboundMessageSize) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withMaxInboundMessageSize(maxInboundMessageSize); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createMaxOutboundMessageSizeCallOption( + final Integer maxOutboundMessageSize) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { @Override public > T wrapStub(final T stub) { return stub.withMaxOutboundMessageSize(maxOutboundMessageSize); } }; - this.maxOutboundMessageSize = maxOutboundMessageSize; - callOptions.add(callOption); - return this; } /** - * Build an instance of GrpcCallOption. + * Build an instance of GrpcCallOptions. * - * @return the GrpcCallOption instance + * @return the GrpcCallOptions instance */ - public GrpcCallOption build() { - return new GrpcCallOption(this); + public GrpcCallOptions build() { + if (deadline != null) { + var callOption = createDeadlineCallOption(deadline); + callOptions.add(callOption); + } + + if (executor != null) { + var callOption = createExecutorCallOption(executor); + callOptions.add(callOption); + } + + if (compressorName != null) { + var callOption = createCompressionCallOption(compressorName); + callOptions.add(callOption); + } + + if (waitForReady != null) { + var callOption = createWaitForReadyCallOption(); + callOptions.add(callOption); + } + + if (maxInboundMessageSize != null) { + var callOption = createMaxInboundMessageSizeCallOption(maxInboundMessageSize); + callOptions.add(callOption); + } + + if (maxOutboundMessageSize != null) { + var callOption = createMaxOutboundMessageSizeCallOption(maxOutboundMessageSize); + callOptions.add(callOption); + } + + return new GrpcCallOptions(this); } } } diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index a2114043..1d401dac 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -342,8 +342,8 @@ private Stream queryData(@Nonnull final String query, } }); - GrpcCallOption grpcCallOption = options.grpcCallOption(); - CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptionCallback() : null; + GrpcCallOptions grpcCallOption = options.grpcCallOption(); + CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : null; return flightSqlClient.execute( query, database, diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 35e720c6..87ffbca3 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -28,7 +28,7 @@ import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.Arguments; -import com.influxdb.v3.client.internal.GrpcCallOption; +import com.influxdb.v3.client.internal.GrpcCallOptions; /** * Query API options. @@ -62,7 +62,7 @@ public final class QueryOptions { private final String database; private final QueryType queryType; private final Map headers; - private GrpcCallOption grpcCallOption; + private GrpcCallOptions grpcCallOption; /** * Construct QueryAPI options. The query type is set to SQL. @@ -147,57 +147,18 @@ public Map headersSafe() { } /** - * The grpcCallOption pass to this function will be merged with the default grpcCallOption. + * Sets the GrpcCallOptions object. * @param grpcCallOption the grpcCallOption */ - public void setGrpcCallOption(@Nonnull final GrpcCallOption grpcCallOption) { - GrpcCallOption.Builder builder = getDefaultGrpcCallOptsBuilder(grpcCallOption); - - if (grpcCallOption.getMaxOutboundMessageSize() != null) { - builder.withMaxOutboundMessageSize(grpcCallOption.getMaxOutboundMessageSize()); - } - - if (grpcCallOption.getExecutor() != null) { - builder.withExecutor(grpcCallOption.getExecutor()); - } - - if (grpcCallOption.getWaitForReady() != null) { - builder.withWaitForReady(); - } - - if (grpcCallOption.getDeadline() != null) { - builder.withDeadline(grpcCallOption.getDeadline()); - } - - if (grpcCallOption.getCompressorName() != null) { - builder.withCompressorName(grpcCallOption.getCompressorName()); - } - - this.grpcCallOption = builder.build(); - } - - /** - * @param grpcCallOption the grpcCallOption. - * @return the default grpc builder with some default options - */ - @Nonnull - private static GrpcCallOption.Builder getDefaultGrpcCallOptsBuilder(@Nonnull final GrpcCallOption grpcCallOption) { - GrpcCallOption.Builder builder = new GrpcCallOption.Builder(); - if (grpcCallOption.getMaxInboundMessageSize() != null) { - builder.withMaxInboundMessageSize(grpcCallOption.getMaxInboundMessageSize()); - } else { - // Set this for backward compatibility - builder.withMaxInboundMessageSize(Integer.MAX_VALUE); - } - - return builder; + public void setGrpcCallOption(@Nonnull final GrpcCallOptions grpcCallOption) { + this.grpcCallOption = grpcCallOption; } /** - * @return grpc call options with some default options + * @return the GrpcCallOptions object. */ @Nullable - public GrpcCallOption grpcCallOption() { + public GrpcCallOptions grpcCallOption() { return grpcCallOption; } diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index c866fb90..881e1265 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -352,14 +352,15 @@ void concatCallOptions() throws Exception { Assertions.assertThat(callOptions).isNotNull(); Assertions.assertThat(callOptions.length).isEqualTo(1); - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() .withMaxOutboundMessageSize(1) .withCompressorName("gzip") .build(); - callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptionCallback(), headerCallOption); + callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptions(), headerCallOption); Assertions.assertThat(callOptions).isNotNull(); - Assertions.assertThat(callOptions.length).isEqualTo(3); + // This equals to 4 because we always have a default maxInboundMessageSize + Assertions.assertThat(callOptions.length).isEqualTo(4); } } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 9e9f31a1..aca50823 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -58,7 +58,7 @@ import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; -import com.influxdb.v3.client.internal.GrpcCallOption; +import com.influxdb.v3.client.internal.GrpcCallOptions; class QueryOptionsTest { @@ -123,7 +123,7 @@ void setInboundMessageSizeSmall() throws Exception { .database("test"); // Set very small message size for testing - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() .withMaxInboundMessageSize(200) .build(); @@ -161,7 +161,7 @@ void setInboundMessageSizeLarge() throws Exception { .database("test") .build(); - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() .withMaxInboundMessageSize(1024 * 1024 * 1024) .build(); @@ -186,7 +186,7 @@ void setGrpcCallOption() { Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); String compressorName = "name"; - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder().withExecutor(executor) + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder().withExecutor(executor) .withMaxInboundMessageSize(1024) .withMaxOutboundMessageSize(1024) .withWaitForReady() @@ -209,7 +209,7 @@ void setGrpcCallOption() { @Test void grpcCallOptionDefaultOptions() { QueryOptions queryOptions = new QueryOptions("test"); - queryOptions.setGrpcCallOption(new GrpcCallOption.Builder().build()); + queryOptions.setGrpcCallOption(new GrpcCallOptions.Builder().build()); Assertions.assertThat(queryOptions.grpcCallOption()).isNotNull(); Assertions.assertThat(queryOptions.grpcCallOption() .getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); @@ -219,7 +219,7 @@ void grpcCallOptionDefaultOptions() { void grpcCallOption() { Executor executor = Executors.newSingleThreadExecutor(); Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); - GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() .withMaxInboundMessageSize(1024) .withMaxOutboundMessageSize(1024) .withCompressorName("my-compressor") @@ -231,7 +231,7 @@ void grpcCallOption() { .usePlaintext() .build(); FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); - for (CallOption option : grpcCallOption.getCallOptionCallback()) { + for (CallOption option : grpcCallOption.getCallOptions()) { stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); } From 45eaee95bca18b78579b698caec08c425cf7979a Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 3 Apr 2025 10:52:24 +0700 Subject: [PATCH 16/23] refactor: change class name --- .../java/com/influxdb/v3/client/internal/GrpcCallOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 2d6ad36f..8417516b 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -155,7 +155,7 @@ public int hashCode() { @Override public String toString() { - return "GrpcCallOption{" + return "GrpcCallOptions{" + "deadline=" + deadline + ", executor=" + executor + ", compressorName='" + compressorName From 65118444af3203c4e955cf2a711a870ea6ed332f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 3 Apr 2025 15:37:40 +0700 Subject: [PATCH 17/23] refactor: remove default inbound message size --- .../v3/client/internal/GrpcCallOptions.java | 2 +- .../client/internal/InfluxDBClientImpl.java | 38 +++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 8417516b..3967b468 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -174,7 +174,7 @@ public static final class Builder { private Executor executor; private String compressorName; private Boolean waitForReady; - private Integer maxInboundMessageSize = Integer.MAX_VALUE; + private Integer maxInboundMessageSize; private Integer maxOutboundMessageSize; private final List callOptions = new ArrayList<>(); diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 1d401dac..f5667740 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -37,8 +37,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import io.grpc.stub.AbstractStub; import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallOptions; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -63,6 +65,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient { private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter " + "or use default configuration at 'ClientConfig.database'."; + private static final CallOption[] EMPTY_CALL_OPTIONS = new CallOption[0]; private static final Map NO_PARAMETERS = Map.of(); private static final List> ALLOWED_NAMED_PARAMETER_TYPES = List.of( String.class, @@ -342,18 +345,47 @@ private Stream queryData(@Nonnull final String query, } }); - GrpcCallOptions grpcCallOption = options.grpcCallOption(); - CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : null; + CallOption[] queryCallOptions = createQueryCallOptions(options); return flightSqlClient.execute( query, database, options.queryTypeSafe(), parameters, options.headersSafe(), - callOptions + queryCallOptions ); } + /** + * Creates an array of CallOption with some default CallOption. + * + * @param options the QueryOptions object + * @return the array of CallOption + */ + @Nonnull + CallOption[] createQueryCallOptions(@Nonnull final QueryOptions options) { + GrpcCallOptions grpcCallOption = options.grpcCallOption(); + CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : EMPTY_CALL_OPTIONS; + if (grpcCallOption == null || grpcCallOption.getMaxInboundMessageSize() == null) { + callOptions = Stream.concat( + Stream.of(maxInboundMessageCallOption()), + Stream.of(callOptions)) + .toArray(CallOption[]::new); + } + return callOptions; + } + + @Nonnull + private CallOption maxInboundMessageCallOption() { + return new CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(T stub) { + return stub.withMaxInboundMessageSize(Integer.MAX_VALUE); + } + }; + } + + @Nonnull private byte[] gzipData(@Nonnull final byte[] data) throws IOException { final ByteArrayOutputStream out = new ByteArrayOutputStream(); From cfb514bf37e600d4e2106df54d02060d899f41f0 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 3 Apr 2025 15:43:37 +0700 Subject: [PATCH 18/23] refactor: remove default inbound message size --- .../influxdb/v3/client/internal/FlightSqlClientTest.java | 2 +- .../com/influxdb/v3/client/query/QueryOptionsTest.java | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 881e1265..e70229ad 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -360,7 +360,7 @@ void concatCallOptions() throws Exception { callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptions(), headerCallOption); Assertions.assertThat(callOptions).isNotNull(); // This equals to 4 because we always have a default maxInboundMessageSize - Assertions.assertThat(callOptions.length).isEqualTo(4); + Assertions.assertThat(callOptions.length).isEqualTo(3); } } diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index aca50823..0eff25df 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -206,15 +206,6 @@ void setGrpcCallOption() { } - @Test - void grpcCallOptionDefaultOptions() { - QueryOptions queryOptions = new QueryOptions("test"); - queryOptions.setGrpcCallOption(new GrpcCallOptions.Builder().build()); - Assertions.assertThat(queryOptions.grpcCallOption()).isNotNull(); - Assertions.assertThat(queryOptions.grpcCallOption() - .getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); - } - @Test void grpcCallOption() { Executor executor = Executors.newSingleThreadExecutor(); From ceea03d6078b84d8b56dcecf0513e508f7591f71 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 4 Apr 2025 07:15:58 +0700 Subject: [PATCH 19/23] refactor: set default max message size in constructor --- .../v3/client/internal/FlightSqlClient.java | 14 +- .../v3/client/internal/GrpcCallOptions.java | 33 +++++ .../client/internal/InfluxDBClientImpl.java | 26 +--- .../client/internal/FlightSqlClientTest.java | 39 ------ .../client/internal/GrpcCallOptionsTest.java | 121 ++++++++++++++++++ 5 files changed, 158 insertions(+), 75 deletions(-) create mode 100644 src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index d3fbcea9..697693fb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -27,13 +27,11 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; import java.util.stream.Stream; @@ -130,7 +128,7 @@ Stream execute(@Nonnull final String query, } HeaderCallOption headerCallOption = metadataHeader(headers); - CallOption[] callOptionArray = concatCallOptions(callOptions, headerCallOption); + CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption); Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); FlightStream stream = client.getStream(ticket, callOptionArray); @@ -237,16 +235,6 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina }; } - @Nullable - CallOption[] concatCallOptions(@Nullable final CallOption[] base, final CallOption... callOption) { - if (base == null || base.length == 0) { - return callOption; - } - List results = new ArrayList<>(List.of(base)); - Arrays.stream(callOption).filter(Objects::nonNull).forEach(results::add); - return results.toArray(new CallOption[0]); - } - private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 3967b468..47968548 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -22,9 +22,12 @@ package com.influxdb.v3.client.internal; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Executor; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -57,6 +60,22 @@ private GrpcCallOptions(@Nonnull final Builder builder) { this.callOptions = builder.callOptions.toArray(new CallOption[0]); } + /** + * Merges two arrays of {@link CallOption} into a single array. The method combines the elements + * from the baseCallOptions array and the additional callOptions array. If either of the input + * arrays is null, it will be treated as an empty array. + * + * @param baseCallOptions the base array of {@link CallOption} instances, may be null + * @param callOptions additional {@link CallOption} instances to be added, may also be null + * @return a combined array containing all {@link CallOption} instances from both input arrays + */ + public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, final CallOption... callOptions) { + return Stream.concat( + Arrays.stream(Optional.ofNullable(baseCallOptions).orElse(new CallOption[0])), + Arrays.stream(Optional.ofNullable(callOptions).orElse(new CallOption[0])) + ).toArray(CallOption[]::new); + } + /** * Returns the absolute deadline for a call. * @@ -178,8 +197,17 @@ public static final class Builder { private Integer maxOutboundMessageSize; private final List callOptions = new ArrayList<>(); + /** + * Constructs a new instance of the Builder with default values. + * By default, the maximum inbound message size is set to the largest possible value. + */ + public Builder() { + this.maxInboundMessageSize = Integer.MAX_VALUE; + } + /** * Sets the absolute deadline for a rpc call. + * * @param deadline The deadline * @return this */ @@ -191,6 +219,7 @@ public Builder withDeadline(final @Nonnull Deadline deadline) { /** * Sets an {@code executor} to be used instead of the default * executor specified with {@link ManagedChannelBuilder#executor}. + * * @param executor The executor * @return this */ @@ -206,6 +235,7 @@ public Builder withExecutor(@Nonnull final Executor executor) { *

    It is only safe to call this if the server supports the compression format chosen. There is * no negotiation performed; if the server does not support the compression chosen, the call will * fail. + * * @param compressorName The compressor name * @return this */ @@ -220,6 +250,7 @@ public Builder withCompressorName(@Nonnull final String compressorName) { * available. This may dramatically increase the latency of the RPC, but avoids failing * "unnecessarily." The default queues the RPC until an attempt to connect has completed, but * fails RPCs without sending them if unable to connect. + * * @return this */ public Builder withWaitForReady() { @@ -230,6 +261,7 @@ public Builder withWaitForReady() { /** * Sets the maximum allowed message size acceptable from the remote peer. If unset, this will * default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}. + * * @param maxInboundMessageSize The max receive message size * @return this */ @@ -240,6 +272,7 @@ public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessag /** * Sets the maximum allowed message size acceptable sent to the remote peer. + * * @param maxOutboundMessageSize The maximum message send size * @return this */ diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index f5667740..0c31fbb1 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -65,7 +65,6 @@ public final class InfluxDBClientImpl implements InfluxDBClient { private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter " + "or use default configuration at 'ClientConfig.database'."; - private static final CallOption[] EMPTY_CALL_OPTIONS = new CallOption[0]; private static final Map NO_PARAMETERS = Map.of(); private static final List> ALLOWED_NAMED_PARAMETER_TYPES = List.of( String.class, @@ -345,41 +344,22 @@ private Stream queryData(@Nonnull final String query, } }); - CallOption[] queryCallOptions = createQueryCallOptions(options); + CallOption[] callOptions = options.grpcCallOption() != null ? options.grpcCallOption().getCallOptions() : null; return flightSqlClient.execute( query, database, options.queryTypeSafe(), parameters, options.headersSafe(), - queryCallOptions + callOptions ); } - /** - * Creates an array of CallOption with some default CallOption. - * - * @param options the QueryOptions object - * @return the array of CallOption - */ - @Nonnull - CallOption[] createQueryCallOptions(@Nonnull final QueryOptions options) { - GrpcCallOptions grpcCallOption = options.grpcCallOption(); - CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : EMPTY_CALL_OPTIONS; - if (grpcCallOption == null || grpcCallOption.getMaxInboundMessageSize() == null) { - callOptions = Stream.concat( - Stream.of(maxInboundMessageCallOption()), - Stream.of(callOptions)) - .toArray(CallOption[]::new); - } - return callOptions; - } - @Nonnull private CallOption maxInboundMessageCallOption() { return new CallOptions.GrpcCallOption() { @Override - public > T wrapStub(T stub) { + public > T wrapStub(final T stub) { return stub.withMaxInboundMessageSize(Integer.MAX_VALUE); } }; diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index e70229ad..84a609ad 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -26,20 +26,16 @@ import java.util.Map; import io.grpc.HttpConnectProxiedSocketAddress; -import io.grpc.Metadata; import io.grpc.ProxyDetector; import io.grpc.internal.GrpcUtil; import org.apache.arrow.flight.CallHeaders; import org.apache.arrow.flight.CallInfo; -import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightClientMiddleware; import org.apache.arrow.flight.FlightServer; -import org.apache.arrow.flight.HeaderCallOption; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; -import org.apache.arrow.flight.grpc.MetadataAdapter; import org.apache.arrow.memory.RootAllocator; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -329,41 +325,6 @@ void createProxyDetector() { } } - @Test - void concatCallOptions() throws Exception { - ClientConfig clientConfig = new ClientConfig.Builder() - .host("https://localhost:80") - .build(); - try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) { - Assertions.assertThatNoException().isThrownBy(() -> { - CallOption[] results = flightSqlClient.concatCallOptions(null, null); - Assertions.assertThat(results).isNull(); - }); - - MetadataAdapter metadata = new MetadataAdapter(new Metadata()); - metadata.insert("key1", "value1"); - HeaderCallOption headerCallOption = new HeaderCallOption(metadata); - - CallOption[] callOptions = flightSqlClient.concatCallOptions(null, headerCallOption); - Assertions.assertThat(callOptions).isNotNull(); - Assertions.assertThat(callOptions.length).isEqualTo(1); - - callOptions = flightSqlClient.concatCallOptions(new CallOption[]{headerCallOption}); - Assertions.assertThat(callOptions).isNotNull(); - Assertions.assertThat(callOptions.length).isEqualTo(1); - - GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() - .withMaxOutboundMessageSize(1) - .withCompressorName("gzip") - .build(); - - callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptions(), headerCallOption); - Assertions.assertThat(callOptions).isNotNull(); - // This equals to 4 because we always have a default maxInboundMessageSize - Assertions.assertThat(callOptions.length).isEqualTo(3); - } - } - static class CallHeadersMiddleware implements FlightClientMiddleware.Factory { CallHeaders headers; diff --git a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java new file mode 100644 index 00000000..23286fa3 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java @@ -0,0 +1,121 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import io.grpc.stub.AbstractStub; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallOptions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class GrpcCallOptionsTest { + + @Test + void testNotSetMaxInboundMessageSize() { + GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder().build(); + assertNotNull(grpcCallOptions); + assertEquals(Integer.MAX_VALUE, grpcCallOptions.getMaxInboundMessageSize()); + } + + @Test + void testSetMaxInboundMessageSize() { + GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .build(); + assertNotNull(grpcCallOptions); + assertEquals(2000, grpcCallOptions.getMaxInboundMessageSize()); + } + + @Test + void testMergeCallOptionsWithBothNonNullArrays() { + CallOption option1 = callOption(); + CallOption option2 = callOption(); + CallOption[] baseCallOptions = {option1}; + CallOption[] additionalCallOptions = {option2}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(2, result.length); + assertEquals(option1, result[0]); + assertEquals(option2, result[1]); + } + + @Test + void testMergeCallOptionsWithBaseCallOptionsNull() { + CallOption option1 = callOption(); + CallOption option2 = callOption(); + CallOption[] baseCallOptions = null; + CallOption[] additionalCallOptions = {option1, option2}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(2, result.length); + assertEquals(option1, result[0]); + assertEquals(option2, result[1]); + } + + @Test + void testMergeCallOptionsWithAdditionalCallOptionsNull() { + CallOption option1 = callOption(); + CallOption[] baseCallOptions = {option1}; + CallOption[] additionalCallOptions = null; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(1, result.length); + assertEquals(option1, result[0]); + } + + @Test + void testMergeCallOptionsWithBothArraysNull() { + CallOption[] result = GrpcCallOptions.mergeCallOptions(null, null); + + assertNotNull(result); + assertEquals(0, result.length); + } + + @Test + void testMergeCallOptionsWithEmptyArrays() { + CallOption[] baseCallOptions = {}; + CallOption[] additionalCallOptions = {}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(0, result.length); + } + + private CallOption callOption() { + return new CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withMaxInboundMessageSize(Integer.MAX_VALUE); + } + }; + } + +} From 21c4fefc687edab5663cbf2c1b268b30f5189325 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 4 Apr 2025 07:31:24 +0700 Subject: [PATCH 20/23] fix: linter --- .../java/com/influxdb/v3/client/internal/GrpcCallOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 47968548..14ea440d 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -69,7 +69,8 @@ private GrpcCallOptions(@Nonnull final Builder builder) { * @param callOptions additional {@link CallOption} instances to be added, may also be null * @return a combined array containing all {@link CallOption} instances from both input arrays */ - public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, final CallOption... callOptions) { + public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, + final CallOption... callOptions) { return Stream.concat( Arrays.stream(Optional.ofNullable(baseCallOptions).orElse(new CallOption[0])), Arrays.stream(Optional.ofNullable(callOptions).orElse(new CallOption[0])) From cfbdd3577506acee94676df9ce8f648caa53c0fa Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 8 Apr 2025 07:10:10 +0700 Subject: [PATCH 21/23] refactor: remove unused maxInboundMessageCallOption method --- .../v3/client/internal/InfluxDBClientImpl.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 0c31fbb1..933f35cd 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -37,10 +37,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import io.grpc.stub.AbstractStub; import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.flight.CallOption; -import org.apache.arrow.flight.CallOptions; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -355,17 +353,6 @@ private Stream queryData(@Nonnull final String query, ); } - @Nonnull - private CallOption maxInboundMessageCallOption() { - return new CallOptions.GrpcCallOption() { - @Override - public > T wrapStub(final T stub) { - return stub.withMaxInboundMessageSize(Integer.MAX_VALUE); - } - }; - } - - @Nonnull private byte[] gzipData(@Nonnull final byte[] data) throws IOException { final ByteArrayOutputStream out = new ByteArrayOutputStream(); From f580c4bbf83802684850e8836aa6622384a89be4 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 8 Apr 2025 09:26:58 +0700 Subject: [PATCH 22/23] refactor: change function name --- .../v3/client/internal/GrpcCallOptions.java | 18 +++++++++++ .../client/internal/InfluxDBClientImpl.java | 3 +- .../v3/client/query/QueryOptions.java | 12 +++---- .../v3/client/query/QueryOptionsTest.java | 32 ++++++++++++------- 4 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index 14ea440d..eb87f9a4 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -60,6 +60,24 @@ private GrpcCallOptions(@Nonnull final Builder builder) { this.callOptions = builder.callOptions.toArray(new CallOption[0]); } + + /** + * Creates a default instance of {@link GrpcCallOptions} with predefined settings. + *

    + * The default configuration includes: + *

    + * Other options can be customized using the {@link GrpcCallOptions.Builder}. + * + * @return the default configuration of {@link GrpcCallOptions}. + */ + public static GrpcCallOptions getDefaultOptions() { + GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder(); + builder.withMaxInboundMessageSize(Integer.MAX_VALUE); + return builder.build(); + } + /** * Merges two arrays of {@link CallOption} into a single array. The method combines the elements * from the baseCallOptions array and the additional callOptions array. If either of the input diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 933f35cd..ce6c0227 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -342,7 +342,8 @@ private Stream queryData(@Nonnull final String query, } }); - CallOption[] callOptions = options.grpcCallOption() != null ? options.grpcCallOption().getCallOptions() : null; + CallOption[] callOptions = options.grpcCallOptions() != null + ? options.grpcCallOptions().getCallOptions() : null; return flightSqlClient.execute( query, database, diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 87ffbca3..f616b52c 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -62,7 +62,7 @@ public final class QueryOptions { private final String database; private final QueryType queryType; private final Map headers; - private GrpcCallOptions grpcCallOption; + private GrpcCallOptions grpcCallOptions = GrpcCallOptions.getDefaultOptions(); /** * Construct QueryAPI options. The query type is set to SQL. @@ -148,18 +148,18 @@ public Map headersSafe() { /** * Sets the GrpcCallOptions object. - * @param grpcCallOption the grpcCallOption + * @param grpcCallOptions the grpcCallOptions */ - public void setGrpcCallOption(@Nonnull final GrpcCallOptions grpcCallOption) { - this.grpcCallOption = grpcCallOption; + public void setGrpcCallOptions(@Nonnull final GrpcCallOptions grpcCallOptions) { + this.grpcCallOptions = grpcCallOptions; } /** * @return the GrpcCallOptions object. */ @Nullable - public GrpcCallOptions grpcCallOption() { - return grpcCallOption; + public GrpcCallOptions grpcCallOptions() { + return grpcCallOptions; } private boolean isNotDefined(final String option) { diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 0eff25df..fe94db3e 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -128,7 +128,7 @@ void setInboundMessageSizeSmall() throws Exception { .build(); QueryOptions queryOptions = new QueryOptions("test"); - queryOptions.setGrpcCallOption(grpcCallOption); + queryOptions.setGrpcCallOptions(grpcCallOption); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { try (Stream stream = influxDBClient.queryPoints( @@ -166,7 +166,7 @@ void setInboundMessageSizeLarge() throws Exception { .build(); QueryOptions queryOptions = new QueryOptions("test"); - queryOptions.setGrpcCallOption(grpcCallOption); + queryOptions.setGrpcCallOptions(grpcCallOption); try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { Assertions.assertThatNoException().isThrownBy(() -> { @@ -181,7 +181,15 @@ void setInboundMessageSizeLarge() throws Exception { } @Test - void setGrpcCallOption() { + void defaultGrpcCallOptions() { + GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions(); + Assertions.assertThat(grpcCallOptions).isNotNull(); + Assertions.assertThat(grpcCallOptions.getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + Assertions.assertThat(grpcCallOptions.getCallOptions().length).isEqualTo(1); + } + + @Test + void setGrpcCallOptions() { Executor executor = Executors.newSingleThreadExecutor(); Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); String compressorName = "name"; @@ -195,19 +203,19 @@ void setGrpcCallOption() { .build(); QueryOptions options = new QueryOptions("test"); - options.setGrpcCallOption(grpcCallOption); - Assertions.assertThat(options.grpcCallOption()).isNotNull(); - Assertions.assertThat(options.grpcCallOption().getMaxInboundMessageSize()).isEqualTo(1024); - Assertions.assertThat(options.grpcCallOption().getMaxOutboundMessageSize()).isEqualTo(1024); - Assertions.assertThat(options.grpcCallOption().getExecutor()).isEqualTo(executor); - Assertions.assertThat(options.grpcCallOption().getWaitForReady()).isTrue(); - Assertions.assertThat(options.grpcCallOption().getCompressorName()).isEqualTo(compressorName); - Assertions.assertThat(options.grpcCallOption().getDeadline()).isEqualTo(deadline); + options.setGrpcCallOptions(grpcCallOption); + Assertions.assertThat(options.grpcCallOptions()).isNotNull(); + Assertions.assertThat(options.grpcCallOptions().getMaxInboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOptions().getMaxOutboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOptions().getExecutor()).isEqualTo(executor); + Assertions.assertThat(options.grpcCallOptions().getWaitForReady()).isTrue(); + Assertions.assertThat(options.grpcCallOptions().getCompressorName()).isEqualTo(compressorName); + Assertions.assertThat(options.grpcCallOptions().getDeadline()).isEqualTo(deadline); } @Test - void grpcCallOption() { + void grpcCallOptions() { Executor executor = Executors.newSingleThreadExecutor(); Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() From 282d5c6ec129e55b15181d8609a9d3a05b5d4e54 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 8 Apr 2025 15:19:44 +0700 Subject: [PATCH 23/23] refactor: not use stream --- .../v3/client/internal/GrpcCallOptions.java | 15 ++-- .../client/internal/InfluxDBClientImpl.java | 3 +- .../v3/client/query/QueryOptions.java | 3 +- .../client/internal/GrpcCallOptionsTest.java | 83 +++++++++++++++++++ 4 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java index eb87f9a4..8fd04e0d 100644 --- a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -22,12 +22,9 @@ package com.influxdb.v3.client.internal; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.Executor; -import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -72,9 +69,9 @@ private GrpcCallOptions(@Nonnull final Builder builder) { * * @return the default configuration of {@link GrpcCallOptions}. */ + @Nonnull public static GrpcCallOptions getDefaultOptions() { GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder(); - builder.withMaxInboundMessageSize(Integer.MAX_VALUE); return builder.build(); } @@ -89,10 +86,12 @@ public static GrpcCallOptions getDefaultOptions() { */ public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, final CallOption... callOptions) { - return Stream.concat( - Arrays.stream(Optional.ofNullable(baseCallOptions).orElse(new CallOption[0])), - Arrays.stream(Optional.ofNullable(callOptions).orElse(new CallOption[0])) - ).toArray(CallOption[]::new); + CallOption[] base = baseCallOptions != null ? baseCallOptions : new CallOption[0]; + CallOption[] additional = callOptions != null ? callOptions : new CallOption[0]; + CallOption[] merged = new CallOption[base.length + additional.length]; + System.arraycopy(base, 0, merged, 0, base.length); + System.arraycopy(additional, 0, merged, base.length, additional.length); + return merged; } /** diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index ce6c0227..aadf3ccf 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -342,8 +342,7 @@ private Stream queryData(@Nonnull final String query, } }); - CallOption[] callOptions = options.grpcCallOptions() != null - ? options.grpcCallOptions().getCallOptions() : null; + CallOption[] callOptions = options.grpcCallOptions().getCallOptions(); return flightSqlClient.execute( query, database, diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index f616b52c..7b88e039 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -151,13 +151,14 @@ public Map headersSafe() { * @param grpcCallOptions the grpcCallOptions */ public void setGrpcCallOptions(@Nonnull final GrpcCallOptions grpcCallOptions) { + Arguments.checkNotNull(grpcCallOptions, "grpcCallOptions"); this.grpcCallOptions = grpcCallOptions; } /** * @return the GrpcCallOptions object. */ - @Nullable + @Nonnull public GrpcCallOptions grpcCallOptions() { return grpcCallOptions; } diff --git a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java index 23286fa3..bbc0f2e2 100644 --- a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; class GrpcCallOptionsTest { @@ -118,4 +119,86 @@ public > T wrapStub(final T stub) { }; } + @Test + void testEqualsWithEqualObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + + assertEquals(options1, options2); + } + + @Test + void testEqualsWithDifferentObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1000) + .withCompressorName("deflate") + .build(); + + assertNotEquals(options1, options2); + } + + @Test + void testEqualsWithNullAndDifferentClass() { + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .build(); + + assertNotEquals(null, options); + assertNotEquals(1, options); + } + + @Test + void testHashCodeWithEqualObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + + assertEquals(options1.hashCode(), options2.hashCode()); + } + + @Test + void testHashCodeWithDifferentObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1000) + .withCompressorName("deflate") + .build(); + + assertNotEquals(options1.hashCode(), options2.hashCode()); + } + + @Test + void testToString() { + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withMaxOutboundMessageSize(5000) + .withCompressorName("gzip") + .build(); + + String expected = "GrpcCallOptions{deadline=null, " + + "executor=null, " + + "compressorName='gzip', " + + "waitForReady=null, " + + "maxInboundMessageSize=2000, " + + "maxOutboundMessageSize=5000}"; + assertEquals(expected, options.toString()); + } }