diff --git a/CHANGELOG.md b/CHANGELOG.md index 2312896a..3c979fdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ ### Features 1. [#229](https://github.com/InfluxCommunity/influxdb3-java/pull/229): Support proxy and custom ssl root certificates -2. [#233](https://github.com/InfluxCommunity/influxdb3-java/pull/233): More detailed documentation about timestamp handling for query and write functions +2. [#232](https://github.com/InfluxCommunity/influxdb3-java/pull/232): Allow set rpc max message size through maxInboundMessageSize in ClientConfig +3. [#233](https://github.com/InfluxCommunity/influxdb3-java/pull/233): More detailed documentation about timestamp handling for query and write functions ## 1.0.0 [2024-12-11] diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index fc248e30..ad6a9d4f 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -57,10 +57,10 @@ *
  • disableServerCertificateValidation - * disable server certificate validation for HTTPS connections *
  • - *
  • proxyUrl - Proxy url for query api and write api
  • + *
  • proxyUrl - proxy url for query api and write api
  • *
  • authenticator - HTTP proxy authenticator
  • *
  • headers - headers to be added to requests
  • - *
  • sslRootsFilePath - Path to the stored certificates file in PEM format
  • + *
  • sslRootsFilePath - path to the stored certificates file in PEM format
  • * *

    * If you want to create a client with custom configuration, you can use following code: diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index e1b37298..697693fb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -49,6 +49,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightGrpcUtils; import org.apache.arrow.flight.FlightStream; @@ -106,7 +107,8 @@ Stream execute(@Nonnull final String query, @Nonnull final String database, @Nonnull final QueryType queryType, @Nonnull final Map queryParameters, - @Nonnull final Map headers) { + @Nonnull final Map headers, + final CallOption... callOptions) { Map ticketData = new HashMap<>() {{ put("database", database); @@ -126,8 +128,10 @@ Stream execute(@Nonnull final String query, } HeaderCallOption headerCallOption = metadataHeader(headers); + CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption); + Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - FlightStream stream = client.getStream(ticket, headerCallOption); + FlightStream stream = client.getStream(ticket, callOptionArray); FlightSqlIterator iterator = new FlightSqlIterator(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); @@ -141,11 +145,10 @@ public void close() throws Exception { @Nonnull private FlightClient createFlightClient(@Nonnull final ClientConfig config) { - Location location = createLocation(config); - - final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forTarget(location.getUri().getHost()); + URI uri = createLocation(config).getUri(); + final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()); - if (LocationSchemes.GRPC_TLS.equals(location.getUri().getScheme())) { + if (LocationSchemes.GRPC_TLS.equals(uri.getScheme())) { nettyChannelBuilder.useTransportSecurity(); SslContext nettySslContext = createNettySslContext(config); @@ -164,7 +167,6 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { } nettyChannelBuilder.maxTraceEvents(0) - .maxInboundMessageSize(Integer.MAX_VALUE) .maxInboundMetadataSize(Integer.MAX_VALUE); return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build()); diff --git a/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java new file mode 100644 index 00000000..8fd04e0d --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java @@ -0,0 +1,400 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executor; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.grpc.CompressorRegistry; +import io.grpc.Deadline; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.AbstractStub; +import org.apache.arrow.flight.CallOption; + +/** + * The collection of runtime options for a new RPC call. + */ +public final class GrpcCallOptions { + + private final Deadline deadline; + private final Executor executor; + private final String compressorName; + private final Boolean waitForReady; + private final Integer maxInboundMessageSize; + private final Integer maxOutboundMessageSize; + private final CallOption[] callOptions; + + private GrpcCallOptions(@Nonnull final Builder builder) { + this.deadline = builder.deadline; + this.executor = builder.executor; + this.compressorName = builder.compressorName; + this.waitForReady = builder.waitForReady; + this.maxInboundMessageSize = builder.maxInboundMessageSize; + this.maxOutboundMessageSize = builder.maxOutboundMessageSize; + this.callOptions = builder.callOptions.toArray(new CallOption[0]); + } + + + /** + * Creates a default instance of {@link GrpcCallOptions} with predefined settings. + *

    + * The default configuration includes: + *

    + * Other options can be customized using the {@link GrpcCallOptions.Builder}. + * + * @return the default configuration of {@link GrpcCallOptions}. + */ + @Nonnull + public static GrpcCallOptions getDefaultOptions() { + GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder(); + return builder.build(); + } + + /** + * Merges two arrays of {@link CallOption} into a single array. The method combines the elements + * from the baseCallOptions array and the additional callOptions array. If either of the input + * arrays is null, it will be treated as an empty array. + * + * @param baseCallOptions the base array of {@link CallOption} instances, may be null + * @param callOptions additional {@link CallOption} instances to be added, may also be null + * @return a combined array containing all {@link CallOption} instances from both input arrays + */ + public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, + final CallOption... callOptions) { + CallOption[] base = baseCallOptions != null ? baseCallOptions : new CallOption[0]; + CallOption[] additional = callOptions != null ? callOptions : new CallOption[0]; + CallOption[] merged = new CallOption[base.length + additional.length]; + System.arraycopy(base, 0, merged, 0, base.length); + System.arraycopy(additional, 0, merged, base.length, additional.length); + return merged; + } + + /** + * Returns the absolute deadline for a call. + * + * @return the Deadline object + */ + @Nullable + public Deadline getDeadline() { + return deadline; + } + + /** + * Returns the Executor to be used instead of default. + * + * @return the Executor + */ + @Nullable + public Executor getExecutor() { + return executor; + } + + /** + * Returns the compressor's name. + * + * @return the compressor's name + */ + @Nullable + public String getCompressorName() { + return compressorName; + } + + /** + * Returns the wait for ready flag. + * + * @return the wait for ready flag + */ + @Nullable + public Boolean getWaitForReady() { + return waitForReady; + } + + /** + * Returns the maximum allowed message size acceptable from the remote peer. + * + * @return the maximum message size receive allowed + */ + @Nullable + public Integer getMaxInboundMessageSize() { + return maxInboundMessageSize; + } + + /** + * Returns the maximum allowed message size acceptable to send the remote peer. + * + * @return the maximum message size send allowed + */ + @Nullable + public Integer getMaxOutboundMessageSize() { + return maxOutboundMessageSize; + } + + /** + * Get the CallOption callback list which is use when setting + * the grpc CallOption. + * + * @return the CallOption list + */ + @Nonnull + public CallOption[] getCallOptions() { + return callOptions; + } + + @Override + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + GrpcCallOptions that = (GrpcCallOptions) o; + return Objects.equals(deadline, that.deadline) + && Objects.equals(executor, that.executor) + && Objects.equals(compressorName, that.compressorName) + && Objects.equals(waitForReady, that.waitForReady) + && Objects.equals(maxInboundMessageSize, that.maxInboundMessageSize) + && Objects.equals(maxOutboundMessageSize, that.maxOutboundMessageSize); + } + + @Override + public int hashCode() { + return Objects.hash(deadline, + executor, + compressorName, + waitForReady, + maxInboundMessageSize, + maxOutboundMessageSize + ); + } + + @Override + public String toString() { + return "GrpcCallOptions{" + + "deadline=" + deadline + + ", executor=" + executor + + ", compressorName='" + compressorName + + '\'' + + ", waitForReady=" + waitForReady + + ", maxInboundMessageSize=" + maxInboundMessageSize + + ", maxOutboundMessageSize=" + maxOutboundMessageSize + + '}'; + } + + /** + * Builder for GrpcCallOption. + */ + public static final class Builder { + private Deadline deadline; + private Executor executor; + private String compressorName; + private Boolean waitForReady; + private Integer maxInboundMessageSize; + private Integer maxOutboundMessageSize; + private final List callOptions = new ArrayList<>(); + + /** + * Constructs a new instance of the Builder with default values. + * By default, the maximum inbound message size is set to the largest possible value. + */ + public Builder() { + this.maxInboundMessageSize = Integer.MAX_VALUE; + } + + /** + * Sets the absolute deadline for a rpc call. + * + * @param deadline The deadline + * @return this + */ + public Builder withDeadline(final @Nonnull Deadline deadline) { + this.deadline = deadline; + return this; + } + + /** + * Sets an {@code executor} to be used instead of the default + * executor specified with {@link ManagedChannelBuilder#executor}. + * + * @param executor The executor + * @return this + */ + public Builder withExecutor(@Nonnull final Executor executor) { + this.executor = executor; + return this; + } + + /** + * Sets the compression to use for the call. The compressor must be a valid name known in the + * {@link CompressorRegistry}. By default, the "gzip" compressor will be available. + * + *

    It is only safe to call this if the server supports the compression format chosen. There is + * no negotiation performed; if the server does not support the compression chosen, the call will + * fail. + * + * @param compressorName The compressor name + * @return this + */ + public Builder withCompressorName(@Nonnull final String compressorName) { + this.compressorName = compressorName; + return this; + } + + /** + * Enables + * 'wait for ready' for the call. Wait-for-ready queues the RPC until a connection is + * available. This may dramatically increase the latency of the RPC, but avoids failing + * "unnecessarily." The default queues the RPC until an attempt to connect has completed, but + * fails RPCs without sending them if unable to connect. + * + * @return this + */ + public Builder withWaitForReady() { + this.waitForReady = true; + return this; + } + + /** + * Sets the maximum allowed message size acceptable from the remote peer. If unset, this will + * default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}. + * + * @param maxInboundMessageSize The max receive message size + * @return this + */ + public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessageSize) { + this.maxInboundMessageSize = maxInboundMessageSize; + return this; + } + + /** + * Sets the maximum allowed message size acceptable sent to the remote peer. + * + * @param maxOutboundMessageSize The maximum message send size + * @return this + */ + public Builder withMaxOutboundMessageSize(@Nonnull final Integer maxOutboundMessageSize) { + this.maxOutboundMessageSize = maxOutboundMessageSize; + return this; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createDeadlineCallOption( + final Deadline deadline) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withDeadline(deadline); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createExecutorCallOption( + final Executor executor) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withExecutor(executor); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createCompressionCallOption( + final String compressorName) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withCompression(compressorName); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createWaitForReadyCallOption() { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withWaitForReady(); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createMaxInboundMessageSizeCallOption( + final Integer maxInboundMessageSize) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withMaxInboundMessageSize(maxInboundMessageSize); + } + }; + } + + private org.apache.arrow.flight.CallOptions.GrpcCallOption createMaxOutboundMessageSizeCallOption( + final Integer maxOutboundMessageSize) { + return new org.apache.arrow.flight.CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withMaxOutboundMessageSize(maxOutboundMessageSize); + } + }; + } + + /** + * Build an instance of GrpcCallOptions. + * + * @return the GrpcCallOptions instance + */ + public GrpcCallOptions build() { + if (deadline != null) { + var callOption = createDeadlineCallOption(deadline); + callOptions.add(callOption); + } + + if (executor != null) { + var callOption = createExecutorCallOption(executor); + callOptions.add(callOption); + } + + if (compressorName != null) { + var callOption = createCompressionCallOption(compressorName); + callOptions.add(callOption); + } + + if (waitForReady != null) { + var callOption = createWaitForReadyCallOption(); + callOptions.add(callOption); + } + + if (maxInboundMessageSize != null) { + var callOption = createMaxInboundMessageSizeCallOption(maxInboundMessageSize); + callOptions.add(callOption); + } + + if (maxOutboundMessageSize != null) { + var callOption = createMaxOutboundMessageSizeCallOption(maxOutboundMessageSize); + callOptions.add(callOption); + } + + return new GrpcCallOptions(this); + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 6d67b7d0..aadf3ccf 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import io.netty.handler.codec.http.HttpMethod; +import org.apache.arrow.flight.CallOption; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -341,7 +342,15 @@ private Stream queryData(@Nonnull final String query, } }); - return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe()); + CallOption[] callOptions = options.grpcCallOptions().getCallOptions(); + return flightSqlClient.execute( + query, + database, + options.queryTypeSafe(), + parameters, + options.headersSafe(), + callOptions + ); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index 8ddf88e8..7b88e039 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -28,6 +28,7 @@ import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.Arguments; +import com.influxdb.v3.client.internal.GrpcCallOptions; /** * Query API options. @@ -61,6 +62,7 @@ public final class QueryOptions { private final String database; private final QueryType queryType; private final Map headers; + private GrpcCallOptions grpcCallOptions = GrpcCallOptions.getDefaultOptions(); /** * Construct QueryAPI options. The query type is set to SQL. @@ -144,6 +146,23 @@ public Map headersSafe() { return headers; } + /** + * Sets the GrpcCallOptions object. + * @param grpcCallOptions the grpcCallOptions + */ + public void setGrpcCallOptions(@Nonnull final GrpcCallOptions grpcCallOptions) { + Arguments.checkNotNull(grpcCallOptions, "grpcCallOptions"); + this.grpcCallOptions = grpcCallOptions; + } + + /** + * @return the GrpcCallOptions object. + */ + @Nonnull + public GrpcCallOptions grpcCallOptions() { + return grpcCallOptions; + } + private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 319d335c..d2d772f4 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -63,9 +63,9 @@ void hashConfig() { Assertions.assertThat(config.hashCode()).isEqualTo(configBuilder.build().hashCode()); Assertions.assertThat(config.hashCode()) - .isNotEqualTo(configBuilder.database("database").build().hashCode()); + .isNotEqualTo(configBuilder.database("database").build().hashCode()); Assertions.assertThat(config.hashCode()) - .isNotEqualTo(configBuilder.defaultTags(defaultTags).build().hashCode()); + .isNotEqualTo(configBuilder.defaultTags(defaultTags).build().hashCode()); } @Test diff --git a/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java new file mode 100644 index 00000000..bbc0f2e2 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java @@ -0,0 +1,204 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import io.grpc.stub.AbstractStub; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallOptions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class GrpcCallOptionsTest { + + @Test + void testNotSetMaxInboundMessageSize() { + GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder().build(); + assertNotNull(grpcCallOptions); + assertEquals(Integer.MAX_VALUE, grpcCallOptions.getMaxInboundMessageSize()); + } + + @Test + void testSetMaxInboundMessageSize() { + GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .build(); + assertNotNull(grpcCallOptions); + assertEquals(2000, grpcCallOptions.getMaxInboundMessageSize()); + } + + @Test + void testMergeCallOptionsWithBothNonNullArrays() { + CallOption option1 = callOption(); + CallOption option2 = callOption(); + CallOption[] baseCallOptions = {option1}; + CallOption[] additionalCallOptions = {option2}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(2, result.length); + assertEquals(option1, result[0]); + assertEquals(option2, result[1]); + } + + @Test + void testMergeCallOptionsWithBaseCallOptionsNull() { + CallOption option1 = callOption(); + CallOption option2 = callOption(); + CallOption[] baseCallOptions = null; + CallOption[] additionalCallOptions = {option1, option2}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(2, result.length); + assertEquals(option1, result[0]); + assertEquals(option2, result[1]); + } + + @Test + void testMergeCallOptionsWithAdditionalCallOptionsNull() { + CallOption option1 = callOption(); + CallOption[] baseCallOptions = {option1}; + CallOption[] additionalCallOptions = null; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(1, result.length); + assertEquals(option1, result[0]); + } + + @Test + void testMergeCallOptionsWithBothArraysNull() { + CallOption[] result = GrpcCallOptions.mergeCallOptions(null, null); + + assertNotNull(result); + assertEquals(0, result.length); + } + + @Test + void testMergeCallOptionsWithEmptyArrays() { + CallOption[] baseCallOptions = {}; + CallOption[] additionalCallOptions = {}; + + CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions); + + assertNotNull(result); + assertEquals(0, result.length); + } + + private CallOption callOption() { + return new CallOptions.GrpcCallOption() { + @Override + public > T wrapStub(final T stub) { + return stub.withMaxInboundMessageSize(Integer.MAX_VALUE); + } + }; + } + + @Test + void testEqualsWithEqualObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + + assertEquals(options1, options2); + } + + @Test + void testEqualsWithDifferentObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1000) + .withCompressorName("deflate") + .build(); + + assertNotEquals(options1, options2); + } + + @Test + void testEqualsWithNullAndDifferentClass() { + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .build(); + + assertNotEquals(null, options); + assertNotEquals(1, options); + } + + @Test + void testHashCodeWithEqualObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + + assertEquals(options1.hashCode(), options2.hashCode()); + } + + @Test + void testHashCodeWithDifferentObjects() { + GrpcCallOptions options1 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withCompressorName("gzip") + .build(); + GrpcCallOptions options2 = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1000) + .withCompressorName("deflate") + .build(); + + assertNotEquals(options1.hashCode(), options2.hashCode()); + } + + @Test + void testToString() { + GrpcCallOptions options = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(2000) + .withMaxOutboundMessageSize(5000) + .withCompressorName("gzip") + .build(); + + String expected = "GrpcCallOptions{deadline=null, " + + "executor=null, " + + "compressorName='gzip', " + + "waitForReady=null, " + + "maxInboundMessageSize=2000, " + + "maxOutboundMessageSize=5000}"; + assertEquals(expected, options.toString()); + } +} diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 3040ef70..fe94db3e 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -21,11 +21,44 @@ */ package com.influxdb.v3.client.query; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import javax.annotation.Nonnull; + +import io.grpc.Deadline; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallOptions; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.impl.FlightServiceGrpc; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.internal.GrpcCallOptions; class QueryOptionsTest { @@ -73,4 +106,185 @@ void optionsOverrideQueryType() { Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(options.queryTypeSafe()).isEqualTo(QueryType.InfluxQL); } + + @Test + void setInboundMessageSizeSmall() throws Exception { + URI uri = URI.create("http://127.0.0.1:33333"); + int rowCount = 100; + try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig.Builder builder = new ClientConfig.Builder() + .host(host) + .database("test"); + + // Set very small message size for testing + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(200) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(builder.build())) { + try (Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"", + queryOptions + )) { + try { + Assertions.assertThatThrownBy(stream::count); + } catch (FlightRuntimeException e) { + Assertions.assertThat(e.status().code()).isEqualTo(CallStatus.RESOURCE_EXHAUSTED.code()); + } + } + } + } + } + + @Test + void setInboundMessageSizeLarge() throws Exception { + URI uri = URI.create("http://127.0.0.1:33333"); + int rowCount = 100; + try (VectorSchemaRoot vectorSchemaRoot = generateVectorSchemaRoot(10, rowCount); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = simpleFlightServer(uri, allocator, simpleProducer(vectorSchemaRoot)) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .build(); + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024 * 1024 * 1024) + .build(); + + QueryOptions queryOptions = new QueryOptions("test"); + queryOptions.setGrpcCallOptions(grpcCallOption); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Assertions.assertThatNoException().isThrownBy(() -> { + Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"", + queryOptions); + Assertions.assertThat(stream.count()).isEqualTo(rowCount); + stream.close(); + }); + } + } + } + + @Test + void defaultGrpcCallOptions() { + GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions(); + Assertions.assertThat(grpcCallOptions).isNotNull(); + Assertions.assertThat(grpcCallOptions.getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + Assertions.assertThat(grpcCallOptions.getCallOptions().length).isEqualTo(1); + } + + @Test + void setGrpcCallOptions() { + Executor executor = Executors.newSingleThreadExecutor(); + Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); + String compressorName = "name"; + + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder().withExecutor(executor) + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withWaitForReady() + .withDeadline(deadline) + .withCompressorName(compressorName) + .build(); + + QueryOptions options = new QueryOptions("test"); + options.setGrpcCallOptions(grpcCallOption); + Assertions.assertThat(options.grpcCallOptions()).isNotNull(); + Assertions.assertThat(options.grpcCallOptions().getMaxInboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOptions().getMaxOutboundMessageSize()).isEqualTo(1024); + Assertions.assertThat(options.grpcCallOptions().getExecutor()).isEqualTo(executor); + Assertions.assertThat(options.grpcCallOptions().getWaitForReady()).isTrue(); + Assertions.assertThat(options.grpcCallOptions().getCompressorName()).isEqualTo(compressorName); + Assertions.assertThat(options.grpcCallOptions().getDeadline()).isEqualTo(deadline); + + } + + @Test + void grpcCallOptions() { + Executor executor = Executors.newSingleThreadExecutor(); + Deadline deadline = Deadline.after(2, TimeUnit.SECONDS); + GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder() + .withMaxInboundMessageSize(1024) + .withMaxOutboundMessageSize(1024) + .withCompressorName("my-compressor") + .withWaitForReady() + .withExecutor(executor) + .withDeadline(deadline) + .build(); + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) + .usePlaintext() + .build(); + FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); + for (CallOption option : grpcCallOption.getCallOptions()) { + stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); + } + + io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); + Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()) + .isEqualTo(grpcCallOption.getMaxInboundMessageSize()); + Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()) + .isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); + Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); + Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); + Assertions.assertThat(stubCallOptions.getExecutor()).isEqualTo(grpcCallOption.getExecutor()); + Assertions.assertThat(stubCallOptions.getDeadline()).isEqualTo(grpcCallOption.getDeadline()); + } + + private FlightServer simpleFlightServer(@Nonnull final URI uri, + @Nonnull final BufferAllocator allocator, + @Nonnull final NoOpFlightProducer producer) throws Exception { + Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort()); + return FlightServer.builder(allocator, location, producer).build(); + } + + private NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) { + return new NoOpFlightProducer() { + @Override + public void getStream(final CallContext context, + final Ticket ticket, + final ServerStreamListener listener) { + listener.start(vectorSchemaRoot); + if (listener.isReady()) { + listener.putNext(); + } + listener.completed(); + } + }; + } + + private VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) { + List fields = new ArrayList<>(); + for (int i = 0; i < fieldCount; i++) { + Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null); + fields.add(field); + } + + Schema schema = new Schema(fields); + VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); + for (Field field : fields) { + VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field); + vector.allocateNew(rowCount); + for (int i = 0; i < rowCount; i++) { + vector.set(i, "Value".getBytes(StandardCharsets.UTF_8)); + } + } + vectorSchemaRoot.setRowCount(rowCount); + + return vectorSchemaRoot; + } }