Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")

define_option(ARROW_FLIGHT_SQL "Build the Arrow Flight SQL extension" OFF)

define_option(ARROW_FLIGHT_DP_SHM "Build the Arrow Flight shared memory data plane" OFF)

define_option(ARROW_GANDIVA "Build the Gandiva libraries" OFF)

define_option(ARROW_GCS
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ endif()
# </KLUDGE> Restore the CXXFLAGS that were modified above
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_BACKUP}")

# Data plane source files
# TODO(yibo): separate common (serialize.cc) and driver (shm.cc)
if(ARROW_FLIGHT_DP_SHM)
add_definitions(-DFLIGHT_DP_SHM)
set(DATAPLANE_SRCS data_plane/serialize.cc data_plane/shm.cc)
endif()

# Note, we do not compile the generated Protobuf sources directly, instead
# compiling then via protocol_internal.cc which contains some gRPC template
# overrides to enable Flight-specific optimizations. See comments in
Expand All @@ -164,7 +171,8 @@ set(ARROW_FLIGHT_SRCS
serialization_internal.cc
server.cc
server_auth.cc
types.cc)
types.cc
data_plane/types.cc)

add_arrow_lib(arrow_flight
CMAKE_PACKAGE_NAME
Expand All @@ -175,6 +183,7 @@ add_arrow_lib(arrow_flight
ARROW_FLIGHT_LIBRARIES
SOURCES
${ARROW_FLIGHT_SRCS}
${DATAPLANE_SRCS}
PRECOMPILED_HEADERS
"$<$<COMPILE_LANGUAGE:CXX>:arrow/flight/pch.h>"
DEPENDENCIES
Expand Down
251 changes: 181 additions & 70 deletions cpp/src/arrow/flight/client.cc

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions cpp/src/arrow/flight/data_plane/internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>

#include "arrow/result.h"

namespace arrow {
namespace flight {
namespace internal {

class ClientDataPlane;
class ServerDataPlane;

enum class StreamType { kGet, kPut, kExchange };

struct DataPlaneMaker {
arrow::Result<std::unique_ptr<ClientDataPlane>> (*make_client)(const std::string&);
arrow::Result<std::unique_ptr<ServerDataPlane>> (*make_server)(const std::string&);
};

// data plane makers are defined in data plane drivers
DataPlaneMaker GetShmDataPlaneMaker();
DataPlaneMaker GetUcxDataPlaneMaker();

} // namespace internal
} // namespace flight
} // namespace arrow
106 changes: 106 additions & 0 deletions cpp/src/arrow/flight/data_plane/serialize.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/data_plane/serialize.h"
#include "arrow/flight/customize_protobuf.h"
#include "arrow/flight/internal.h"
#include "arrow/flight/serialization_internal.h"
#include "arrow/result.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"

#include <grpc/byte_buffer.h>
#include <grpc/slice.h>

#include <cstring>

namespace arrow {
namespace flight {
namespace internal {

namespace {

void ReleaseBuffer(void* buffer_ptr) {
delete reinterpret_cast<std::shared_ptr<Buffer>*>(buffer_ptr);
}

} // namespace

Status Deserialize(std::shared_ptr<Buffer> buffer, FlightData* data) {
// hold the buffer
std::shared_ptr<Buffer>* buffer_ptr = new std::shared_ptr<Buffer>(std::move(buffer));

const grpc::Slice slice((*buffer_ptr)->mutable_data(),
static_cast<size_t>((*buffer_ptr)->size()), &ReleaseBuffer,
buffer_ptr);
grpc::ByteBuffer bbuf = grpc::ByteBuffer(&slice, 1);

{
// make sure GrpcBuffer::Wrap goes the wanted path
auto grpc_bbuf = *reinterpret_cast<grpc_byte_buffer**>(&bbuf);
DCHECK_EQ(grpc_bbuf->type, GRPC_BB_RAW);
DCHECK_EQ(grpc_bbuf->data.raw.compression, GRPC_COMPRESS_NONE);
DCHECK_EQ(grpc_bbuf->data.raw.slice_buffer.count, 1);
grpc_slice slice = grpc_bbuf->data.raw.slice_buffer.slices[0];
DCHECK_NE(slice.refcount, 0);
}

// buffer ownership is transferred to "data" on success
const Status st = FromGrpcStatus(FlightDataDeserialize(&bbuf, data));
if (!st.ok()) {
delete buffer_ptr;
}
return st;
}

SerializeSlice::SerializeSlice(grpc::Slice&& slice) {
slice_ = arrow::internal::make_unique<grpc::Slice>(std::move(slice));
}
SerializeSlice::SerializeSlice(SerializeSlice&&) = default;
SerializeSlice::~SerializeSlice() = default;

const uint8_t* SerializeSlice::data() const { return slice_->begin(); }
int64_t SerializeSlice::size() const { return static_cast<int64_t>(slice_->size()); }

arrow::Result<std::vector<SerializeSlice>> Serialize(const FlightPayload& payload,
int64_t* total_size) {
RETURN_NOT_OK(payload.Validate());

grpc::ByteBuffer bbuf;
bool owner;
RETURN_NOT_OK(FromGrpcStatus(FlightDataSerialize(payload, &bbuf, &owner)));

if (total_size) {
*total_size = static_cast<int64_t>(bbuf.Length());
}

// ByteBuffer::Dump doesn't copy data buffer, IIUC
std::vector<grpc::Slice> grpc_slices;
RETURN_NOT_OK(FromGrpcStatus(bbuf.Dump(&grpc_slices)));

// move grpc slice life cycle to returned serialize slice
std::vector<SerializeSlice> slices;
for (auto& grpc_slice : grpc_slices) {
SerializeSlice slice(std::move(grpc_slice));
slices.emplace_back(std::move(slice));
}
return slices;
}

} // namespace internal
} // namespace flight
} // namespace arrow
92 changes: 92 additions & 0 deletions cpp/src/arrow/flight/data_plane/serialize.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/buffer.h"
#include "arrow/result.h"

#include <memory>
#include <vector>

namespace grpc {

class Slice;

}; // namespace grpc

namespace arrow {
namespace flight {

struct FlightPayload;

namespace internal {

struct FlightData;

// Reader buffer management
// # data plane receives data and creates/stores to one reader buffer
// # Deserialize() creates a new shared_ptr to hold the buffer, creates a
// gprc slice per that buffer, and installs destroyer callback, which
// deletes the created shared_ptr when grpc slice is freed
// # Deserialize() calls FlightDataDeserialize() which transfers grpc slice
// lifecyce to Buffer object (FlightData->body) managed by the end consumer
// * see GrpcBuffer::slice_ in serialization_internal.cc
// # releasing the reader buffer
// # end consumer frees FlightData
// # frees Buffer(GrpcBuffer) object
// # frees grpc slice GrpcBuffer::slice_
// # invokes destroyer to release reader buffer

// Reader buffer must be a continuous memory block, see GrpcBuffer::Wrap
// - FlightDataDeserialize will copy and flatten non-continuous blocks anyway
// - it's necessary to get destroyer called

Status Deserialize(std::shared_ptr<Buffer> buffer, FlightData* data);

// Writer buffer management
// # FlightDataSerialize() holds buffer (FlightPayload.ipc_msg.body_buffer[i])
// in returned grpc bbuf
// * see SliceFromBuffer in serialization_internal.cc
// # dump grpc bbuf to a vector of grpc slice, then move to SerializeSlice[]
// # data plane sends data per returned SerializeSlice[]
// # releasing the writer buffer
// # data plane frees vector<SerializeSlice>
// # frees grpc slice SerializeSlice::slice_
// # release writer buffer

// a simple wrapper of grpc::Slice, the only purpose is to hide grpc
// from data plane implementation
class SerializeSlice {
public:
explicit SerializeSlice(grpc::Slice&& slice);
SerializeSlice(SerializeSlice&&);
~SerializeSlice();

const uint8_t* data() const;
int64_t size() const;

private:
std::unique_ptr<grpc::Slice> slice_;
};

arrow::Result<std::vector<SerializeSlice>> Serialize(const FlightPayload& payload,
int64_t* total_size = NULLPTR);

} // namespace internal
} // namespace flight
} // namespace arrow
Loading