From 785fe69b1bdb52b1263051bfef8a0a1fd1b5a3ad Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Wed, 18 Oct 2023 19:22:02 +0000 Subject: [PATCH 1/9] Expose app_metadata on FlightInfo and FlightEndpoint --- .../apache/arrow/flight/FlightEndpoint.java | 30 +++++++++++++++-- .../org/apache/arrow/flight/FlightInfo.java | 32 +++++++++++++++++-- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index ad78cfbd210..c38a39cdca5 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -17,6 +17,7 @@ package org.apache.arrow.flight; +import com.google.protobuf.ByteString; import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -39,6 +40,7 @@ public class FlightEndpoint { private final List locations; private final Ticket ticket; private final Instant expirationTime; + private final String appMetadata; /** * Constructs a new endpoint with no expiration time. @@ -54,13 +56,27 @@ public FlightEndpoint(Ticket ticket, Location... locations) { * Constructs a new endpoint with an expiration time. * * @param ticket A ticket that describe the key of a data stream. + * @param expirationTime (optional) When this endpoint expires. * @param locations The possible locations the stream can be retrieved from. */ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) { + this(ticket, expirationTime, null, locations); + } + + /** + * Constructs a new endpoint with an expiration time. + * + * @param ticket A ticket that describe the key of a data stream. + * @param expirationTime (optional) When this endpoint expires. + * @param appMetadata (optional) Application metadata associated with this endpoint. + * @param locations The possible locations the stream can be retrieved from. + */ + public FlightEndpoint(Ticket ticket, Instant expirationTime, String appMetadata, Location... locations) { Objects.requireNonNull(ticket); this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); this.expirationTime = expirationTime; this.ticket = ticket; + this.appMetadata = appMetadata; } /** @@ -77,6 +93,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locatio } else { this.expirationTime = null; } + this.appMetadata = flt.getAppMetadata().toStringUtf8(); this.ticket = new Ticket(flt.getTicket()); } @@ -92,12 +109,17 @@ public Optional getExpirationTime() { return Optional.ofNullable(expirationTime); } + public String getAppMetadata() { + return appMetadata; + } + /** * Converts to the protocol buffer representation. */ Flight.FlightEndpoint toProtocol() { Flight.FlightEndpoint.Builder b = Flight.FlightEndpoint.newBuilder() - .setTicket(ticket.toProtocol()); + .setTicket(ticket.toProtocol()) + .setAppMetadata(ByteString.copyFromUtf8(appMetadata)); for (Location l : locations) { b.addLocation(l.toProtocol()); @@ -148,12 +170,13 @@ public boolean equals(Object o) { FlightEndpoint that = (FlightEndpoint) o; return locations.equals(that.locations) && ticket.equals(that.ticket) && - Objects.equals(expirationTime, that.expirationTime); + Objects.equals(expirationTime, that.expirationTime) && + Objects.equals(appMetadata, that.appMetadata); } @Override public int hashCode() { - return Objects.hash(locations, ticket, expirationTime); + return Objects.hash(locations, ticket, expirationTime, appMetadata); } @Override @@ -162,6 +185,7 @@ public String toString() { "locations=" + locations + ", ticket=" + ticket + ", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) + + ", appMetadata=" + appMetadata + '}'; } } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index d871f89465c..f9156c9011e 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -51,6 +51,7 @@ public class FlightInfo { private final long records; private final boolean ordered; private final IpcOption option; + private final String appMetadata; /** * Constructs a new instance. @@ -94,6 +95,23 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, boolean ordered, IpcOption option) { + this(schema, descriptor, endpoints, bytes, records, ordered, option, null); + } + + /** + * Constructs a new instance. + * + * @param schema The schema of the Flight + * @param descriptor An identifier for the Flight. + * @param endpoints A list of endpoints that have the flight available. + * @param bytes The number of bytes in the flight + * @param records The number of records in the flight. + * @param ordered Whether the endpoints in this flight are ordered. + * @param option IPC write options. + * @param appMetadata App metadata for this flight. + */ + public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, + long records, boolean ordered, IpcOption option, String appMetadata) { Objects.requireNonNull(descriptor); Objects.requireNonNull(endpoints); if (schema != null) { @@ -106,6 +124,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, boolean ordered, IpcOption option) { - this(schema, descriptor, endpoints, bytes, records, ordered, option, null); + this(schema, descriptor, endpoints, bytes, records, ordered, option, ""); } /** diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 238221f051a..95618fe3f90 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -113,10 +113,11 @@ public void roundTripInfo() throws Exception { Field.nullable("a", new ArrowType.Int(32, true)), Field.nullable("b", new ArrowType.FixedSizeBinary(32)) ), metadata); - final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), Collections.emptyList(), -1, -1); + final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), + Collections.emptyList(), -1, -1, false, IpcOption.DEFAULT, "foo"); final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]), Collections.singletonList(new FlightEndpoint( - new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500); + new Ticket(new byte[10]), null, "bar", Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500); final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"), Arrays.asList(new FlightEndpoint( new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")), diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java index 691048fb03e..80c717f3b16 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java @@ -28,6 +28,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.IpcOption; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -137,7 +138,7 @@ public void supportsNullSchemas() throws Exception @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { - return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0); + return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0, false, IpcOption.DEFAULT, "foo"); } }; @@ -147,6 +148,7 @@ public FlightInfo getFlightInfo(CallContext context, FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test")); Assertions.assertEquals(Optional.empty(), flightInfo.getSchemaOptional()); Assertions.assertEquals(new Schema(Collections.emptyList()), flightInfo.getSchema()); + Assertions.assertEquals(flightInfo.getAppMetadata(), "foo"); Exception e = Assertions.assertThrows( FlightRuntimeException.class, From 23ae1ad1ae44b028b8c86af3039494ae68dee2b7 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Thu, 19 Oct 2023 00:22:58 +0000 Subject: [PATCH 3/9] Add FlightInfo builder --- .../org/apache/arrow/flight/FlightInfo.java | 90 ++++++++++++++++++- .../arrow/flight/TestBasicOperation.java | 4 +- 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index 0b681036904..7c544b5502d 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -108,7 +108,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, boolean ordered, IpcOption option, String appMetadata) { @@ -276,4 +276,92 @@ public String toString() { ", appMetadata=" + appMetadata + '}'; } + + /** + * Create a builder for FlightInfo. + * + * @param schema The schema of the Flight + * @param descriptor An identifier for the Flight. + * @param endpoints A list of endpoints that have the flight available. + */ + public static Builder builder(Schema schema, FlightDescriptor descriptor, List endpoints) { + return new Builder(schema, descriptor, endpoints); + } + + /** + * Builder for FlightInfo. + */ + public static final class Builder { + private final Schema schema; + private final FlightDescriptor descriptor; + private final List endpoints; + private long bytes = -1; + private long records = -1; + private boolean ordered = false; + private IpcOption option = IpcOption.DEFAULT; + private String appMetadata = ""; + + private Builder(Schema schema, FlightDescriptor descriptor, List endpoints) { + this.schema = schema; + this.descriptor = descriptor; + this.endpoints = endpoints; + } + + /** + * Set the number of bytes for the flight. Default to -1 for unknown. + * + * @param bytes The number of bytes in the flight + */ + public Builder setBytes(long bytes) { + this.bytes = bytes; + return this; + } + + /** + * Set the number of records for the flight. Default to -1 for unknown. + * + * @param records The number of records in the flight. + */ + public Builder setRecords(long records) { + this.records = records; + return this; + } + + /** + * Set whether the flight endpoints are ordered. Default is false. + * + * @param ordered Whether the endpoints in this flight are ordered. + */ + public Builder setOrdered(boolean ordered) { + this.ordered = ordered; + return this; + } + + /** + * Set IPC write options. Default is IpcOption.DEFAULT + * + * @param option IPC write options. + */ + public Builder setOption(IpcOption option) { + this.option = option; + return this; + } + + /** + * Set the app metadata to send along with the flight. Default is an empty string. + * + * @param appMetadata Metadata to send along with the flight + */ + public Builder setAppMetadata(String appMetadata) { + this.appMetadata = appMetadata; + return this; + } + + /** + * Build FlightInfo object. + */ + public FlightInfo build() { + return new FlightInfo(schema, descriptor, endpoints, bytes, records, ordered, option, appMetadata); + } + } } diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 95618fe3f90..56d43541029 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -113,8 +113,8 @@ public void roundTripInfo() throws Exception { Field.nullable("a", new ArrowType.Int(32, true)), Field.nullable("b", new ArrowType.FixedSizeBinary(32)) ), metadata); - final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), - Collections.emptyList(), -1, -1, false, IpcOption.DEFAULT, "foo"); + final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList()) + .setAppMetadata("foo").build(); final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]), Collections.singletonList(new FlightEndpoint( new Ticket(new byte[10]), null, "bar", Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500); From 18457765923c47a5fa2efbc37f615c96e3e876be Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Fri, 20 Oct 2023 20:20:49 +0000 Subject: [PATCH 4/9] Change type to byte[] --- .../apache/arrow/flight/FlightEndpoint.java | 24 ++++++++++------- .../org/apache/arrow/flight/FlightInfo.java | 26 +++++++++++-------- .../arrow/flight/TestBasicOperation.java | 7 ++--- .../arrow/flight/TestFlightService.java | 5 ++-- 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index 7fe53f089c4..0809aa646a0 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -40,7 +41,7 @@ public class FlightEndpoint { private final List locations; private final Ticket ticket; private final Instant expirationTime; - private final String appMetadata; + private final byte[] appMetadata; /** * Constructs a new endpoint with no expiration time. @@ -60,7 +61,7 @@ public FlightEndpoint(Ticket ticket, Location... locations) { * @param locations The possible locations the stream can be retrieved from. */ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) { - this(ticket, expirationTime, "", locations); + this(ticket, expirationTime, null, locations); } /** @@ -71,7 +72,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locatio * @param appMetadata (optional) Application metadata associated with this endpoint. * @param locations The possible locations the stream can be retrieved from. */ - public FlightEndpoint(Ticket ticket, Instant expirationTime, String appMetadata, Location... locations) { + public FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, Location... locations) { Objects.requireNonNull(ticket); this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); this.expirationTime = expirationTime; @@ -93,7 +94,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, String appMetadata, } else { this.expirationTime = null; } - this.appMetadata = flt.getAppMetadata().toStringUtf8(); + this.appMetadata = flt.getAppMetadata().toByteArray(); this.ticket = new Ticket(flt.getTicket()); } @@ -109,7 +110,7 @@ public Optional getExpirationTime() { return Optional.ofNullable(expirationTime); } - public String getAppMetadata() { + public byte[] getAppMetadata() { return appMetadata; } @@ -118,8 +119,7 @@ public String getAppMetadata() { */ Flight.FlightEndpoint toProtocol() { Flight.FlightEndpoint.Builder b = Flight.FlightEndpoint.newBuilder() - .setTicket(ticket.toProtocol()) - .setAppMetadata(ByteString.copyFromUtf8(appMetadata)); + .setTicket(ticket.toProtocol()); for (Location l : locations) { b.addLocation(l.toProtocol()); @@ -133,6 +133,10 @@ Flight.FlightEndpoint toProtocol() { .build()); } + if (appMetadata != null) { + b.setAppMetadata(ByteString.copyFrom(appMetadata)); + } + return b.build(); } @@ -171,12 +175,12 @@ public boolean equals(Object o) { return locations.equals(that.locations) && ticket.equals(that.ticket) && Objects.equals(expirationTime, that.expirationTime) && - Objects.equals(appMetadata, that.appMetadata); + Arrays.equals(appMetadata, that.appMetadata); } @Override public int hashCode() { - return Objects.hash(locations, ticket, expirationTime, appMetadata); + return Objects.hash(locations, ticket, expirationTime, Arrays.hashCode(appMetadata)); } @Override @@ -185,7 +189,7 @@ public String toString() { "locations=" + locations + ", ticket=" + ticket + ", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) + - ", appMetadata=" + appMetadata + + ", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) + '}'; } } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index 7c544b5502d..0fb64f04a37 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -51,7 +53,7 @@ public class FlightInfo { private final long records; private final boolean ordered; private final IpcOption option; - private final String appMetadata; + private final byte[] appMetadata; /** * Constructs a new instance. @@ -95,7 +97,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, boolean ordered, IpcOption option) { - this(schema, descriptor, endpoints, bytes, records, ordered, option, ""); + this(schema, descriptor, endpoints, bytes, records, ordered, option, null); } /** @@ -111,7 +113,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, - long records, boolean ordered, IpcOption option, String appMetadata) { + long records, boolean ordered, IpcOption option, byte[] appMetadata) { Objects.requireNonNull(descriptor); Objects.requireNonNull(endpoints); if (schema != null) { @@ -150,7 +152,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints) { this.schema = schema; @@ -352,7 +356,7 @@ public Builder setOption(IpcOption option) { * * @param appMetadata Metadata to send along with the flight */ - public Builder setAppMetadata(String appMetadata) { + public Builder setAppMetadata(byte[] appMetadata) { this.appMetadata = appMetadata; return this; } diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 56d43541029..9aee166484b 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -114,10 +114,11 @@ public void roundTripInfo() throws Exception { Field.nullable("b", new ArrowType.FixedSizeBinary(32)) ), metadata); final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList()) - .setAppMetadata("foo").build(); + .setAppMetadata("foo".getBytes()).build(); final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]), Collections.singletonList(new FlightEndpoint( - new Ticket(new byte[10]), null, "bar", Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500); + new Ticket(new byte[10]), null, "bar".getBytes(), Location.forGrpcDomainSocket("/tmp/test.sock")) + ), 200, 500); final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"), Arrays.asList(new FlightEndpoint( new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")), @@ -428,7 +429,7 @@ public void testProtobufSchemaCompatibility() throws Exception { .build(); Assertions.assertEquals(0, protobufData.getDataBody().size()); final ArrowMessage parsedMessage = marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray())); - // Should have no body buffers + // Should have no body buffers5 Assertions.assertFalse(parsedMessage.getBufs().iterator().hasNext()); // Should not throw parsedMessage.asSchema(); diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java index 80c717f3b16..853e729555c 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java @@ -138,7 +138,8 @@ public void supportsNullSchemas() throws Exception @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { - return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0, false, IpcOption.DEFAULT, "foo"); + return new FlightInfo(null, descriptor, Collections.emptyList(), + 0, 0, false, IpcOption.DEFAULT, "foo".getBytes()); } }; @@ -148,7 +149,7 @@ public FlightInfo getFlightInfo(CallContext context, FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test")); Assertions.assertEquals(Optional.empty(), flightInfo.getSchemaOptional()); Assertions.assertEquals(new Schema(Collections.emptyList()), flightInfo.getSchema()); - Assertions.assertEquals(flightInfo.getAppMetadata(), "foo"); + Assertions.assertEquals(flightInfo.getAppMetadata(), "foo".getBytes()); Exception e = Assertions.assertThrows( FlightRuntimeException.class, From 8551bff62a9e928556f151f1bf99fe5c93c930d6 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Fri, 20 Oct 2023 21:18:16 +0000 Subject: [PATCH 5/9] Change to byte[] --- .../src/main/java/org/apache/arrow/flight/FlightEndpoint.java | 2 +- .../src/main/java/org/apache/arrow/flight/FlightInfo.java | 3 +-- .../test/java/org/apache/arrow/flight/TestFlightService.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index 0809aa646a0..302c1f42f1a 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -94,7 +94,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, } else { this.expirationTime = null; } - this.appMetadata = flt.getAppMetadata().toByteArray(); + this.appMetadata = (flt.getAppMetadata().size() == 0 ? null : flt.getAppMetadata().toByteArray()); this.ticket = new Ticket(flt.getTicket()); } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index 0fb64f04a37..0bc1505cd43 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -152,8 +152,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List Date: Fri, 20 Oct 2023 22:01:50 +0000 Subject: [PATCH 6/9] Builder for FlightEndpoint --- .../apache/arrow/flight/FlightEndpoint.java | 65 ++++++++++++++++--- .../org/apache/arrow/flight/FlightInfo.java | 2 +- .../arrow/flight/TestBasicOperation.java | 5 +- 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index 302c1f42f1a..1967fe1d91c 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -61,20 +61,15 @@ public FlightEndpoint(Ticket ticket, Location... locations) { * @param locations The possible locations the stream can be retrieved from. */ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) { - this(ticket, expirationTime, null, locations); + this(ticket, expirationTime, null, Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)))); } /** - * Constructs a new endpoint with an expiration time. - * - * @param ticket A ticket that describe the key of a data stream. - * @param expirationTime (optional) When this endpoint expires. - * @param appMetadata (optional) Application metadata associated with this endpoint. - * @param locations The possible locations the stream can be retrieved from. + * Private constructor with all parameters. Should only be called by Builder. */ - public FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, Location... locations) { + private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, List locations) { Objects.requireNonNull(ticket); - this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); + this.locations = locations; this.expirationTime = expirationTime; this.ticket = ticket; this.appMetadata = appMetadata; @@ -192,4 +187,56 @@ public String toString() { ", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) + '}'; } + + /** + * Create a builder for FlightEndpoint. + * + * @param ticket A ticket that describe the key of a data stream. + * @param locations The possible locations the stream can be retrieved from. + */ + public static Builder builder(Ticket ticket, Location... locations) { + return new Builder(ticket, locations); + } + + /** + * Builder for FlightEndpoint. + */ + public static final class Builder { + private final Ticket ticket; + private final List locations; + private Instant expirationTime = null; + private byte[] appMetadata = null; + + private Builder(Ticket ticket, Location... locations) { + this.ticket = ticket; + this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); + } + + /** + * Set expiration time for the endpoint. Default is null, which means don't expire. + * + * @param expirationTime (optional) When this endpoint expires. + */ + public Builder setExpirationTime(Instant expirationTime) { + this.expirationTime = expirationTime; + return this; + } + + /** + * Set the app metadata to send along with the flight. Default is null; + * + * @param appMetadata Metadata to send along with the flight + */ + public Builder setAppMetadata(byte[] appMetadata) { + this.appMetadata = appMetadata; + return this; + } + + /** + * Build FlightEndpoint object. + */ + public FlightEndpoint build() { + return new FlightEndpoint(ticket, expirationTime, appMetadata, locations); + } + } } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index 0bc1505cd43..b5279a304c8 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -351,7 +351,7 @@ public Builder setOption(IpcOption option) { } /** - * Set the app metadata to send along with the flight. Default is an empty string. + * Set the app metadata to send along with the flight. Default is null. * * @param appMetadata Metadata to send along with the flight */ diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 9aee166484b..a534a847006 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -116,8 +116,9 @@ public void roundTripInfo() throws Exception { final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList()) .setAppMetadata("foo".getBytes()).build(); final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]), - Collections.singletonList(new FlightEndpoint( - new Ticket(new byte[10]), null, "bar".getBytes(), Location.forGrpcDomainSocket("/tmp/test.sock")) + Collections.singletonList( + FlightEndpoint.builder(new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")) + .setAppMetadata("bar".getBytes()).build() ), 200, 500); final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"), Arrays.asList(new FlightEndpoint( From 950ba1da32de1bd92fd6a7bc7fa461cd7ad95281 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Fri, 20 Oct 2023 22:08:32 +0000 Subject: [PATCH 7/9] . --- .../test/java/org/apache/arrow/flight/TestBasicOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index a534a847006..41b3a4693e5 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -430,7 +430,7 @@ public void testProtobufSchemaCompatibility() throws Exception { .build(); Assertions.assertEquals(0, protobufData.getDataBody().size()); final ArrowMessage parsedMessage = marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray())); - // Should have no body buffers5 + // Should have no body buffers Assertions.assertFalse(parsedMessage.getBufs().iterator().hasNext()); // Should not throw parsedMessage.asSchema(); From 73d1c85dc86eb9b27977978ab1a60c3839b3e275 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Mon, 23 Oct 2023 18:59:47 +0000 Subject: [PATCH 8/9] Enable app_metadata_flight_info_endpoint for Java --- dev/archery/archery/integration/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index eb2e26951cd..b44e349989f 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -614,7 +614,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, Scenario( "app_metadata_flight_info_endpoint", description="Ensure support FlightInfo and Endpoint app_metadata", - skip_testers={"JS", "C#", "Rust", "Java"} + skip_testers={"JS", "C#", "Rust"} ), Scenario( "flight_sql", From 7db4c8e8cdb05453e2c097df50207ba0013ffa92 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Tue, 24 Oct 2023 19:27:29 +0000 Subject: [PATCH 9/9] Add integ test --- ...AppMetadataFlightInfoEndpointScenario.java | 76 +++++++++++++++++++ .../flight/integration/tests/Scenarios.java | 1 + .../integration/tests/IntegrationTest.java | 5 ++ 3 files changed, 82 insertions(+) create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java new file mode 100644 index 00000000000..3220bb5a2d2 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** Test app_metadata in FlightInfo and FlightEndpoint. */ +final class AppMetadataFlightInfoEndpointScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new AppMetadataFlightInfoEndpointProducer(); + } + + @Override + public void buildServer(FlightServer.Builder builder) throws Exception { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + byte[] cmd = "foobar".getBytes(StandardCharsets.UTF_8); + FlightInfo info = client.getInfo(FlightDescriptor.command(cmd)); + IntegrationAssertions.assertEquals(info.getAppMetadata(), cmd); + IntegrationAssertions.assertEquals(info.getEndpoints().size(), 1); + IntegrationAssertions.assertEquals(info.getEndpoints().get(0).getAppMetadata(), cmd); + } + + /** producer for app_metadata test. */ + static class AppMetadataFlightInfoEndpointProducer extends NoOpFlightProducer { + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + byte[] cmd = descriptor.getCommand(); + + Schema schema = new Schema( + Collections.singletonList(Field.notNullable("number", Types.MinorType.UINT4.getType()))); + + List endpoints = Collections.singletonList( + FlightEndpoint.builder( + new Ticket("".getBytes(StandardCharsets.UTF_8))).setAppMetadata(cmd).build()); + + return FlightInfo.builder(schema, descriptor, endpoints).setAppMetadata(cmd).build(); + } + } +} + + diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java index 26629c650e3..c61fd94a4d2 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java @@ -49,6 +49,7 @@ private Scenarios() { scenarios.put("poll_flight_info", PollFlightInfoScenario::new); scenarios.put("flight_sql", FlightSqlScenario::new); scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new); + scenarios.put("app_metadata_flight_info_endpoint", AppMetadataFlightInfoEndpointScenario::new); } private static Scenarios getInstance() { diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index cf65e16fac0..477a56055cb 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -78,6 +78,11 @@ void flightSqlExtension() throws Exception { testScenario("flight_sql:extension"); } + @Test + void appMetadataFlightInfoEndpoint() throws Exception { + testScenario("app_metadata_flight_info_endpoint"); + } + void testScenario(String scenarioName) throws Exception { try (final BufferAllocator allocator = new RootAllocator()) { final FlightServer.Builder builder = FlightServer.builder()