Skip to content

Conversation

@cyb70289
Copy link
Contributor

This patch decouples flightrpc data plane from grpc so we can leverage
optimized data transfer libraries.

The basic idea is to replace grpc stream with a data plane stream for
FlightData transmission in DoGet/DoPut/DoExchange. There's no big change
to current flight client and server implementations. Added a wrapper to
support both grpc stream and data plane stream. By default, grpc stream
is used, which goes the current grpc based code path. If a data plane is
enabled (currently through environment variable), flight payload will go
through the data plane stream instead. See client.cc and server.cc to
review the changes.

About data plane implementation

  • data_plane/types.{h,cc}
    Defines client/server data plane and data plane stream interfaces.
    It's the only exported api to other component ({client,server}.cc).
  • data_plane/serialize.{h,cc}
    De-Serialize FlightData manually as we bypass grpc. Luckly, we already
    implemented related functions to support payload zero-copy.
  • shm.cc
    A shared memory driver to verify the data plane approach. The code may
    be a bit hard to read, it's better to focus on data plane interface
    implementations at first before dive deep into details like shared
    memory, ipc and buffer management related code.
    Please note there are still many caveats in current code, see TODO and
    XXX in shm.cc for details.

To evaluate this patch

I tested shared memory data plane on Linux (x86, Arm) and MacOS (Arm).
Build with -DARROW_FLIGHT_DP_SHM=ON to enable the shared memory data
plane. Set FLIGHT_DATAPLANE=shm environment variable to run unit tests
and benchmarks with the shared memory data plane enabled.

Build: cmake -DARROW_FLIGHT_DP_SHM=ON -DARROW_FLIGHT=ON ....
Test:  FLIGHT_DATAPLANE=shm release/arrow-flight-test
Bench: FLIGHT_DATAPLANE=shm release/arrow-flight-benchmark \
       -num_streams=1|2|4 -num_threads=1|2|4

Benchmark result (throughput, latency) on Xeon Gold 5218.
Test case: DoGet, batch size = 128KiB

streams grpc over unix socket shared memory data plane
1 3324 MB/s, 35 us 7045 MB/s, 16 us
2 6289 MB/s, 38 us 13311 MB/s, 17 us
4 10037 MB/s, 44 us 25012 MB/s, 17 us

@github-actions

This comment has been minimized.

@cyb70289
Copy link
Contributor Author

NOTE: The main purpose of this PR is to collect responses and seek for best approaches to support optimized data transfer methods other than grpc.

This patch decouples flightrpc data plane from grpc so we can leverage
optimized data transfer libraries.

The basic idea is to replace grpc stream with a data plane stream for
FlightData transmission in DoGet/DoPut/DoExchange. There's no big change
to current flight client and server implementations. Added a wrapper to
support both grpc stream and data plane stream. By default, grpc stream
is used, which goes the current grpc based code path. If a data plane is
enabled (currently through environment variable), flight payload will go
through the data plane stream instead. See client.cc and server.cc to
review the changes.

**About data plane implementation**

- data_plane/types.{h,cc}
  Defines client/server data plane and data plane stream interfaces.
  It's the only exported api to other component ({client,server}.cc).
- data_plane/serialize.{h,cc}
  De-Serialize FlightData manually as we bypass grpc. Luckly, we already
  implemented related functions to support payload zero-copy.
- shm.cc
  A shared memory driver to verify the data plane approach. The code may
  be a bit hard to read, it's better to focus on data plane interface
  implementations at first before dive deep into details like shared
  memory, ipc and buffer management related code.
  Please note there are still many caveats in current code, see TODO and
  XXX in shm.cc for details.

**To evaluate this patch**

I tested shared memory data plane on Linux (x86, Arm) and MacOS (Arm).
Build with `-DARROW_FLIGHT_DP_SHM=ON` to enable the shared memory data
plane. Set `FLIGHT_DATAPLANE=shm` environment variable to run unit tests
and benchmarks with the shared memory data plane enabled.

```
Build: cmake -DARROW_FLIGHT_DP_SHM=ON -DARROW_FLIGHT=ON ....
Test:  FLIGHT_DATAPLANE=shm release/arrow-flight-test
Bench: FLIGHT_DATAPLANE=shm release/arrow-flight-benchmark \
       -num_streams=1|2|4 -num_threads=1|2|4
```

Benchmark result (throughput, latency) on Xeon Gold 5218.
Test case: DoGet, batch size = 128KiB

| streams | grpc over unix socket | shared memory data plane |
| ------- | --------------------- | ------------------------ |
| 1       |  3324 MB/s,  35 us    |  7045 MB/s,  16 us       |
| 2       |  6289 MB/s,  38 us    | 13311 MB/s,  17 us       |
| 4       | 10037 MB/s,  44 us    | 25012 MB/s,  17 us       |
@cyb70289 cyb70289 requested a review from lidavidm January 20, 2022 08:11
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the core idea here, to separate the data stream behind an interface and mostly decouple it from gRPC. I guess the difference between here and the UCX prototype is the degree of decoupling. For instance see https://github.com/lidavidm/arrow/blob/flight-ucx/cpp/src/arrow/flight/client.cc#L1587-L1597 where the "stream" gets no reference to gRPC at all and gRPC is on equal footing with the other backends, whereas here we still always make a gRPC call.

If we have a "simple" or "data only" backend, the approach here makes more sense. For something like UCX that can entirely replace gRPC, I'm not sure if it makes as much sense. Reconciling the two might be hard, unless we want to just have two entirely different extension points (with perhaps some shared interfaces for the data side of things) - that might be ok.

}

// TODO(yibo): getting data plane uri from env var is bad, shall we extend
// location to support two uri (control, data)? or any better approach to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpc+tcp://localhost:1337/?data=shm or something? Or maybe something like grpc+tcp+shm://... not sure what is semantically correct.

@lidavidm
Copy link
Member

I suppose the key thing is to consider the types of backends we want to support. If they look more like UCX or WebSockets and can entirely replace gRPC, that's one thing; if they look more like shared memory (or libfabrics? DPDK?) then this approach is probably easier.

Or maybe there's an approach where we allow swapping gRPC out entirely, but only for the data plane methods. Since it doesn't really add value to reimplement GetFlightInfo in UCX. (That argument breaks down somewhat for WebSockets, where I think using gRPC at all requires proxying or some other deployment configuration.)

@cyb70289
Copy link
Contributor Author

Thanks @lidavidm !

Now looks to me UCX transport is the better way.

My main concern of the data plane approach is that we have to build by ourselves the data transmission over raw data plane libraries. A robust, high performance communication system is hard enough and we'd better adopt mature frameworks like gRPC or UCX. 80% code of this PR is the shared memory driver, and it's still far from production quality (we need to handle cache management, flow control, and many other tricky things like race conditions).

I think we can leave this PR open to see if there are other comments.

@bkmgit
Copy link
Contributor

bkmgit commented Jan 21, 2022

How were the tests run? Assume the flight benchmark as described at https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ was used.

@lidavidm
Copy link
Member

My main concern of the data plane approach is that we have to build by ourselves the data transmission over raw data plane libraries. A robust, high performance communication system is hard enough and we'd better adopt mature frameworks like gRPC or UCX.

To be fair, even with UCX there is still a fair amount of work to get Flight working on top. But I see your point - there is a lot of code that can be reused in Flight, but something very low level like shared memory without a helper library still requires a lot of work.

@cyb70289
Copy link
Contributor Author

How were the tests run? Assume the flight benchmark as described at https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ was used.

Do you mean how to run flightrpc benchmark?

  • To build it you have to enable -DARROW_FLIGHT=ON in cmake.
  • To run client and server on same host: just run "arrow-flight-benchmark -num_streams=1 -num_threads=1" (change 1 to 2,4,... for more streams). It spawns server at the background automatically.
  • To run across network, you run "arrow-flight-perf-server" on server and "arrow-flight-benchmark" on client, you will need to specify server ip, see the cmdline helps for more options.
  • To verify this PR please see the commit message above.

@cyb70289
Copy link
Contributor Author

Close this PR. Prefer #12442.

@cyb70289 cyb70289 closed this Feb 17, 2022
@cyb70289 cyb70289 deleted the flight-data-plane branch May 7, 2022 07:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants