Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
Scenario(
"app_metadata_flight_info_endpoint",
description="Ensure support FlightInfo and Endpoint app_metadata",
skip_testers={"JS", "C#", "Rust", "Java"}
skip_testers={"JS", "C#", "Rust"}
),
Scenario(
"flight_sql",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.apache.arrow.flight.impl.Flight;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;

/**
Expand All @@ -39,6 +41,7 @@ public class FlightEndpoint {
private final List<Location> locations;
private final Ticket ticket;
private final Instant expirationTime;
private final byte[] appMetadata;

/**
* Constructs a new endpoint with no expiration time.
Expand All @@ -54,13 +57,22 @@ public FlightEndpoint(Ticket ticket, Location... locations) {
* Constructs a new endpoint with an expiration time.
*
* @param ticket A ticket that describe the key of a data stream.
* @param expirationTime (optional) When this endpoint expires.
* @param locations The possible locations the stream can be retrieved from.
*/
public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) {
this(ticket, expirationTime, null, Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))));
}

/**
* Private constructor with all parameters. Should only be called by Builder.
*/
private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, List<Location> locations) {
Objects.requireNonNull(ticket);
this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)));
this.locations = locations;
this.expirationTime = expirationTime;
this.ticket = ticket;
this.appMetadata = appMetadata;
}

/**
Expand All @@ -77,6 +89,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locatio
} else {
this.expirationTime = null;
}
this.appMetadata = (flt.getAppMetadata().size() == 0 ? null : flt.getAppMetadata().toByteArray());
this.ticket = new Ticket(flt.getTicket());
}

Expand All @@ -92,6 +105,10 @@ public Optional<Instant> getExpirationTime() {
return Optional.ofNullable(expirationTime);
}

public byte[] getAppMetadata() {
return appMetadata;
}

/**
* Converts to the protocol buffer representation.
*/
Expand All @@ -111,6 +128,10 @@ Flight.FlightEndpoint toProtocol() {
.build());
}

if (appMetadata != null) {
b.setAppMetadata(ByteString.copyFrom(appMetadata));
}

return b.build();
}

Expand Down Expand Up @@ -148,12 +169,13 @@ public boolean equals(Object o) {
FlightEndpoint that = (FlightEndpoint) o;
return locations.equals(that.locations) &&
ticket.equals(that.ticket) &&
Objects.equals(expirationTime, that.expirationTime);
Objects.equals(expirationTime, that.expirationTime) &&
Arrays.equals(appMetadata, that.appMetadata);
}

@Override
public int hashCode() {
return Objects.hash(locations, ticket, expirationTime);
return Objects.hash(locations, ticket, expirationTime, Arrays.hashCode(appMetadata));
}

@Override
Expand All @@ -162,6 +184,59 @@ public String toString() {
"locations=" + locations +
", ticket=" + ticket +
", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) +
", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}

/**
* Create a builder for FlightEndpoint.
*
* @param ticket A ticket that describe the key of a data stream.
* @param locations The possible locations the stream can be retrieved from.
*/
public static Builder builder(Ticket ticket, Location... locations) {
return new Builder(ticket, locations);
}

/**
* Builder for FlightEndpoint.
*/
public static final class Builder {
private final Ticket ticket;
private final List<Location> locations;
private Instant expirationTime = null;
private byte[] appMetadata = null;

private Builder(Ticket ticket, Location... locations) {
this.ticket = ticket;
this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)));
}

/**
* Set expiration time for the endpoint. Default is null, which means don't expire.
*
* @param expirationTime (optional) When this endpoint expires.
*/
public Builder setExpirationTime(Instant expirationTime) {
this.expirationTime = expirationTime;
return this;
}

/**
* Set the app metadata to send along with the flight. Default is null;
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}

/**
* Build FlightEndpoint object.
*/
public FlightEndpoint build() {
return new FlightEndpoint(ticket, expirationTime, appMetadata, locations);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class FlightInfo {
private final long records;
private final boolean ordered;
private final IpcOption option;
private final byte[] appMetadata;

/**
* Constructs a new instance.
Expand Down Expand Up @@ -94,6 +97,23 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
long records, boolean ordered, IpcOption option) {
this(schema, descriptor, endpoints, bytes, records, ordered, option, null);
}

/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
* @param ordered Whether the endpoints in this flight are ordered.
* @param option IPC write options.
* @param appMetadata Metadata to send along with the flight
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
long records, boolean ordered, IpcOption option, byte[] appMetadata) {
Objects.requireNonNull(descriptor);
Objects.requireNonNull(endpoints);
if (schema != null) {
Expand All @@ -106,6 +126,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
this.records = records;
this.ordered = ordered;
this.option = option;
this.appMetadata = appMetadata;
}

/**
Expand All @@ -131,6 +152,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
ordered = pbFlightInfo.getOrdered();
appMetadata = (pbFlightInfo.getAppMetadata().size() == 0 ? null : pbFlightInfo.getAppMetadata().toByteArray());
option = IpcOption.DEFAULT;
}

Expand Down Expand Up @@ -167,6 +189,10 @@ public boolean getOrdered() {
return ordered;
}

public byte[] getAppMetadata() {
return appMetadata;
}

/**
* Converts to the protocol buffer representation.
*/
Expand All @@ -189,6 +215,9 @@ Flight.FlightInfo toProtocol() {
throw new RuntimeException(e);
}
}
if (appMetadata != null) {
builder.setAppMetadata(ByteString.copyFrom(appMetadata));
}
return builder.build();
}

Expand Down Expand Up @@ -229,12 +258,13 @@ public boolean equals(Object o) {
schema.equals(that.schema) &&
descriptor.equals(that.descriptor) &&
endpoints.equals(that.endpoints) &&
ordered == that.ordered;
ordered == that.ordered &&
Arrays.equals(appMetadata, that.appMetadata);
}

@Override
public int hashCode() {
return Objects.hash(schema, descriptor, endpoints, bytes, records, ordered);
return Objects.hash(schema, descriptor, endpoints, bytes, records, ordered, Arrays.hashCode(appMetadata));
}

@Override
Expand All @@ -246,6 +276,95 @@ public String toString() {
", bytes=" + bytes +
", records=" + records +
", ordered=" + ordered +
", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}

/**
* Create a builder for FlightInfo.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
*/
public static Builder builder(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
return new Builder(schema, descriptor, endpoints);
}

/**
* Builder for FlightInfo.
*/
public static final class Builder {
private final Schema schema;
private final FlightDescriptor descriptor;
private final List<FlightEndpoint> endpoints;
private long bytes = -1;
private long records = -1;
private boolean ordered = false;
private IpcOption option = IpcOption.DEFAULT;
private byte[] appMetadata = null;

private Builder(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
this.schema = schema;
this.descriptor = descriptor;
this.endpoints = endpoints;
}

/**
* Set the number of bytes for the flight. Default to -1 for unknown.
*
* @param bytes The number of bytes in the flight
*/
public Builder setBytes(long bytes) {
this.bytes = bytes;
return this;
}

/**
* Set the number of records for the flight. Default to -1 for unknown.
*
* @param records The number of records in the flight.
*/
public Builder setRecords(long records) {
this.records = records;
return this;
}

/**
* Set whether the flight endpoints are ordered. Default is false.
*
* @param ordered Whether the endpoints in this flight are ordered.
*/
public Builder setOrdered(boolean ordered) {
this.ordered = ordered;
return this;
}

/**
* Set IPC write options. Default is IpcOption.DEFAULT
*
* @param option IPC write options.
*/
public Builder setOption(IpcOption option) {
this.option = option;
return this;
}

/**
* Set the app metadata to send along with the flight. Default is null.
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}

/**
* Build FlightInfo object.
*/
public FlightInfo build() {
return new FlightInfo(schema, descriptor, endpoints, bytes, records, ordered, option, appMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ public void roundTripInfo() throws Exception {
Field.nullable("a", new ArrowType.Int(32, true)),
Field.nullable("b", new ArrowType.FixedSizeBinary(32))
), metadata);
final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), Collections.emptyList(), -1, -1);
final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList())
.setAppMetadata("foo".getBytes()).build();
final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]),
Collections.singletonList(new FlightEndpoint(
new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500);
Collections.singletonList(
FlightEndpoint.builder(new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))
.setAppMetadata("bar".getBytes()).build()
), 200, 500);
final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"),
Arrays.asList(new FlightEndpoint(
new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -137,7 +138,8 @@ public void supportsNullSchemas() throws Exception
@Override
public FlightInfo getFlightInfo(CallContext context,
FlightDescriptor descriptor) {
return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0);
return new FlightInfo(null, descriptor, Collections.emptyList(),
0, 0, false, IpcOption.DEFAULT, "foo".getBytes());
}
};

Expand All @@ -147,6 +149,7 @@ public FlightInfo getFlightInfo(CallContext context,
FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test"));
Assertions.assertEquals(Optional.empty(), flightInfo.getSchemaOptional());
Assertions.assertEquals(new Schema(Collections.emptyList()), flightInfo.getSchema());
Assertions.assertArrayEquals(flightInfo.getAppMetadata(), "foo".getBytes());

Exception e = Assertions.assertThrows(
FlightRuntimeException.class,
Expand Down
Loading