Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,8 @@ macro(build_substrait)
# Note: not all protos in Substrait actually matter to plan
# consumption. No need to build the ones we don't need.
set(SUBSTRAIT_PROTOS algebra extensions/extensions plan type)
set(ARROW_SUBSTRAIT_PROTOS extension_rels)
set(ARROW_SUBSTRAIT_PROTOS_DIR "${CMAKE_SOURCE_DIR}/proto")

externalproject_add(substrait_ep
${EP_COMMON_OPTIONS}
Expand Down Expand Up @@ -1789,6 +1791,27 @@ macro(build_substrait)

list(APPEND SUBSTRAIT_SOURCES "${SUBSTRAIT_PROTO_GEN}.cc")
endforeach()
foreach(ARROW_SUBSTRAIT_PROTO ${ARROW_SUBSTRAIT_PROTOS})
set(ARROW_SUBSTRAIT_PROTO_GEN
"${SUBSTRAIT_CPP_DIR}/substrait/${ARROW_SUBSTRAIT_PROTO}.pb")
foreach(EXT h cc)
set_source_files_properties("${ARROW_SUBSTRAIT_PROTO_GEN}.${EXT}"
PROPERTIES COMPILE_OPTIONS
"${SUBSTRAIT_SUPPRESSED_FLAGS}"
GENERATED TRUE
SKIP_UNITY_BUILD_INCLUSION TRUE)
list(APPEND SUBSTRAIT_PROTO_GEN_ALL "${ARROW_SUBSTRAIT_PROTO_GEN}.${EXT}")
endforeach()
add_custom_command(OUTPUT "${ARROW_SUBSTRAIT_PROTO_GEN}.cc"
"${ARROW_SUBSTRAIT_PROTO_GEN}.h"
COMMAND ${ARROW_PROTOBUF_PROTOC} "-I${SUBSTRAIT_LOCAL_DIR}/proto"
"-I${ARROW_SUBSTRAIT_PROTOS_DIR}"
"--cpp_out=${SUBSTRAIT_CPP_DIR}"
"${ARROW_SUBSTRAIT_PROTOS_DIR}/substrait/${ARROW_SUBSTRAIT_PROTO}.proto"
DEPENDS ${PROTO_DEPENDS} substrait_ep)

list(APPEND SUBSTRAIT_SOURCES "${ARROW_SUBSTRAIT_PROTO_GEN}.cc")
endforeach()

add_custom_target(substrait_gen ALL DEPENDS ${SUBSTRAIT_PROTO_GEN_ALL})

Expand Down
44 changes: 44 additions & 0 deletions cpp/proto/substrait/extension_rels.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
syntax = "proto3";

package arrow.substrait_ext;

import "substrait/algebra.proto";

option csharp_namespace = "Arrow.Substrait";
option go_package = "github.com/apache/arrow/substrait";
option java_multiple_files = true;
option java_package = "io.arrow.substrait";

// As-Of-Join relation
message AsOfJoinRel {
// One key per input relation, each key describing how to join the corresponding input
repeated AsOfJoinKey keys = 1;

// As-Of tolerance, in units of the on-key
int64 tolerance = 2;

// As-Of-Join key
message AsOfJoinKey {
// A field reference defining the on-key
.substrait.Expression on = 1;

// A set of field references defining the by-key
repeated .substrait.Expression by = 2;
}
}
12 changes: 11 additions & 1 deletion cpp/src/arrow/compute/exec/asof_join_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,19 @@ static void TableJoinOverhead(benchmark::State& state,
benchmark::Counter(static_cast<double>(default_memory_pool()->max_memory()));
}

AsofJoinNodeOptions GetRepeatedOptions(size_t repeat, FieldRef on_key,
std::vector<FieldRef> by_key, int64_t tolerance) {
std::vector<AsofJoinNodeOptions::Keys> input_keys(repeat);
for (size_t i = 0; i < repeat; i++) {
input_keys[i] = {on_key, by_key};
}
return AsofJoinNodeOptions(input_keys, tolerance);
}

static void AsOfJoinOverhead(benchmark::State& state) {
int64_t tolerance = 0;
AsofJoinNodeOptions options = AsofJoinNodeOptions(kTimeCol, {kKeyCol}, tolerance);
AsofJoinNodeOptions options =
GetRepeatedOptions(int(state.range(4)), kTimeCol, {kKeyCol}, tolerance);
TableJoinOverhead(
state,
TableGenerationProperties{int(state.range(0)), int(state.range(1)),
Expand Down
107 changes: 85 additions & 22 deletions cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/asof_join_node.h"

#include <condition_variable>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -997,28 +999,27 @@ class AsofJoinNode : public ExecNode {
}

static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<ExecNode*>& inputs,
const std::vector<std::shared_ptr<Schema>> input_schema,
const std::vector<col_index_t>& indices_of_on_key,
const std::vector<std::vector<col_index_t>>& indices_of_by_key) {
std::vector<std::shared_ptr<arrow::Field>> fields;

size_t n_by = indices_of_by_key[0].size();
size_t n_by = indices_of_by_key.size() == 0 ? 0 : indices_of_by_key[0].size();
const DataType* on_key_type = NULLPTR;
std::vector<const DataType*> by_key_type(n_by, NULLPTR);
// Take all non-key, non-time RHS fields
for (size_t j = 0; j < inputs.size(); ++j) {
const auto& input_schema = inputs[j]->output_schema();
for (size_t j = 0; j < input_schema.size(); ++j) {
const auto& on_field_ix = indices_of_on_key[j];
const auto& by_field_ix = indices_of_by_key[j];

if ((on_field_ix == -1) || std_has(by_field_ix, -1)) {
return Status::Invalid("Missing join key on table ", j);
}

const auto& on_field = input_schema->fields()[on_field_ix];
const auto& on_field = input_schema[j]->fields()[on_field_ix];
std::vector<const Field*> by_field(n_by);
for (size_t k = 0; k < n_by; k++) {
by_field[k] = input_schema->fields()[by_field_ix[k]].get();
by_field[k] = input_schema[j]->fields()[by_field_ix[k]].get();
}

if (on_key_type == NULLPTR) {
Expand All @@ -1038,8 +1039,8 @@ class AsofJoinNode : public ExecNode {
}
}

for (int i = 0; i < input_schema->num_fields(); ++i) {
const auto field = input_schema->field(i);
for (int i = 0; i < input_schema[j]->num_fields(); ++i) {
const auto field = input_schema[j]->field(i);
if (i == on_field_ix) {
ARROW_RETURN_NOT_OK(is_valid_on_field(field));
// Only add on field from the left table
Expand Down Expand Up @@ -1076,6 +1077,56 @@ class AsofJoinNode : public ExecNode {
return match.indices()[0];
}

static Result<size_t> GetByKeySize(
const std::vector<asofjoin::AsofJoinKeys>& input_keys) {
size_t n_by = 0;
for (size_t i = 0; i < input_keys.size(); ++i) {
const auto& by_key = input_keys[i].by_key;
if (i == 0) {
n_by = by_key.size();
} else if (n_by != by_key.size()) {
return Status::Invalid("inconsistent size of by-key across inputs");
}
}
return n_by;
}

static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
const std::vector<std::shared_ptr<Schema>>& input_schema,
const std::vector<asofjoin::AsofJoinKeys>& input_keys) {
if (input_schema.size() != input_keys.size()) {
return Status::Invalid("mismatching number of input schema and keys");
}
size_t n_input = input_schema.size();
std::vector<col_index_t> indices_of_on_key(n_input);
for (size_t i = 0; i < n_input; ++i) {
const auto& on_key = input_keys[i].on_key;
ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
FindColIndex(*input_schema[i], on_key, "on"));
}
return indices_of_on_key;
}

static Result<std::vector<std::vector<col_index_t>>> GetIndicesOfByKey(
const std::vector<std::shared_ptr<Schema>>& input_schema,
const std::vector<asofjoin::AsofJoinKeys>& input_keys) {
if (input_schema.size() != input_keys.size()) {
return Status::Invalid("mismatching number of input schema and keys");
}
ARROW_ASSIGN_OR_RAISE(size_t n_by, GetByKeySize(input_keys));
size_t n_input = input_schema.size();
std::vector<std::vector<col_index_t>> indices_of_by_key(
n_input, std::vector<col_index_t>(n_by));
for (size_t i = 0; i < n_input; ++i) {
for (size_t k = 0; k < n_by; k++) {
const auto& by_key = input_keys[i].by_key;
ARROW_ASSIGN_OR_RAISE(indices_of_by_key[i][k],
FindColIndex(*input_schema[i], by_key[k], "by"));
}
}
return indices_of_by_key;
}

static arrow::Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
DCHECK_GE(inputs.size(), 2) << "Must have at least two inputs";
Expand All @@ -1086,24 +1137,21 @@ class AsofJoinNode : public ExecNode {
join_options.tolerance);
}

size_t n_input = inputs.size(), n_by = join_options.by_key.size();
ARROW_ASSIGN_OR_RAISE(size_t n_by, GetByKeySize(join_options.input_keys));
size_t n_input = inputs.size();
std::vector<std::string> input_labels(n_input);
std::vector<col_index_t> indices_of_on_key(n_input);
std::vector<std::vector<col_index_t>> indices_of_by_key(
n_input, std::vector<col_index_t>(n_by));
std::vector<std::shared_ptr<Schema>> input_schema(n_input);
for (size_t i = 0; i < n_input; ++i) {
input_labels[i] = i == 0 ? "left" : "right_" + ToChars(i);
const Schema& input_schema = *inputs[i]->output_schema();
ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
FindColIndex(input_schema, join_options.on_key, "on"));
for (size_t k = 0; k < n_by; k++) {
ARROW_ASSIGN_OR_RAISE(indices_of_by_key[i][k],
FindColIndex(input_schema, join_options.by_key[k], "by"));
}
input_schema[i] = inputs[i]->output_schema();
}

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> output_schema,
MakeOutputSchema(inputs, indices_of_on_key, indices_of_by_key));
ARROW_ASSIGN_OR_RAISE(std::vector<col_index_t> indices_of_on_key,
GetIndicesOfOnKey(input_schema, join_options.input_keys));
ARROW_ASSIGN_OR_RAISE(std::vector<std::vector<col_index_t>> indices_of_by_key,
GetIndicesOfByKey(input_schema, join_options.input_keys));
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Schema> output_schema,
MakeOutputSchema(input_schema, indices_of_on_key, indices_of_by_key));

std::vector<std::unique_ptr<KeyHasher>> key_hashers;
for (size_t i = 0; i < n_input; i++) {
Expand Down Expand Up @@ -1213,5 +1261,20 @@ void RegisterAsofJoinNode(ExecFactoryRegistry* registry) {
}
} // namespace internal

namespace asofjoin {

Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<std::shared_ptr<Schema>>& input_schema,
const std::vector<AsofJoinKeys>& input_keys) {
ARROW_ASSIGN_OR_RAISE(std::vector<col_index_t> indices_of_on_key,
AsofJoinNode::GetIndicesOfOnKey(input_schema, input_keys));
ARROW_ASSIGN_OR_RAISE(std::vector<std::vector<col_index_t>> indices_of_by_key,
AsofJoinNode::GetIndicesOfByKey(input_schema, input_keys));
return AsofJoinNode::MakeOutputSchema(input_schema, indices_of_on_key,
indices_of_by_key);
}

} // namespace asofjoin

} // namespace compute
} // namespace arrow
37 changes: 37 additions & 0 deletions cpp/src/arrow/compute/exec/asof_join_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/options.h"
#include "arrow/type.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace compute {
namespace asofjoin {

using AsofJoinKeys = AsofJoinNodeOptions::Keys;

ARROW_EXPORT Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<std::shared_ptr<Schema>>& input_schema,
const std::vector<AsofJoinKeys>& input_keys);

} // namespace asofjoin
} // namespace compute
} // namespace arrow
Loading