Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,23 +526,6 @@ Result<Datum> ExecuteScalarExpression(const Expression& expr, const Datum& input
"ExecuteScalarExpression cannot Execute non-scalar expression ", expr.ToString());
}

if (input.kind() == Datum::TABLE) {
TableBatchReader reader(*input.table());
std::shared_ptr<RecordBatch> batch;

while (true) {
RETURN_NOT_OK(reader.ReadNext(&batch));
if (batch != nullptr) {
break;
}
ARROW_ASSIGN_OR_RAISE(Datum res, ExecuteScalarExpression(expr, batch));
if (res.is_scalar()) {
ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows(),
exec_context->memory_pool()));
}
}
}

if (auto lit = expr.literal()) return *lit;

if (auto ref = expr.field_ref()) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec/expression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ TEST(Expression, ToString) {
EXPECT_EQ(literal("a").ToString(), "\"a\"");
EXPECT_EQ(literal("a\nb").ToString(), "\"a\\nb\"");
EXPECT_EQ(literal(std::make_shared<BooleanScalar>()).ToString(), "null");
EXPECT_EQ(literal(std::make_shared<Int64Scalar>()).ToString(), "null");
EXPECT_EQ(literal(std::make_shared<BinaryScalar>(Buffer::FromString("az"))).ToString(),
"\"617A\"");

Expand Down Expand Up @@ -540,7 +541,7 @@ void ExpectExecute(Expression expr, Datum in, Datum* actual_out = NULLPTR) {
if (in.is_value()) {
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(in.descr()));
} else {
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*in.record_batch()->schema()));
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*in.schema()));
}

ASSERT_OK_AND_ASSIGN(Datum actual, ExecuteScalarExpression(expr, in));
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ inline Result<FragmentIterator> GetFragmentsFromDatasets(const DatasetVector& da
return MakeFlattenIterator(std::move(fragments_it));
}

inline RecordBatchIterator IteratorFromReader(
const std::shared_ptr<RecordBatchReader>& reader) {
return MakeFunctionIterator([reader] { return reader->Next(); });
}

inline std::shared_ptr<Schema> SchemaFromColumnNames(
const std::shared_ptr<Schema>& input, const std::vector<std::string>& column_names) {
std::vector<std::shared_ptr<Field>> columns;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
std::shared_ptr<Array> actual_struct;

for (auto maybe_batch :
IteratorFromReader(std::make_shared<TableBatchReader>(*actual_table))) {
MakeIteratorFromReader(std::make_shared<TableBatchReader>(*actual_table))) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray());
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class ARROW_EXPORT RecordBatch {
/// \brief Abstract interface for reading stream of record batches
class ARROW_EXPORT RecordBatchReader {
public:
using ValueType = std::shared_ptr<RecordBatch>;

virtual ~RecordBatchReader() = default;

/// \return the shared schema of the record batches in the stream
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,10 @@ Iterator<T> MakeFlattenIterator(Iterator<Iterator<T>> it) {
return Iterator<T>(FlattenIterator<T>(std::move(it)));
}

template <typename Reader>
Iterator<typename Reader::ValueType> MakeIteratorFromReader(
const std::shared_ptr<Reader>& reader) {
return MakeFunctionIterator([reader] { return reader->Next(); });
}

} // namespace arrow