Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 125 additions & 8 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,99 @@ int64_t ArrayData::GetNullCount() const {

int64_t Array::null_count() const { return data_->GetNullCount(); }

namespace internal {

struct ScalarFromArraySlotImpl {
template <typename T>
using ScalarType = typename TypeTraits<T>::ScalarType;

Status Visit(const NullArray& a) {
out_ = std::make_shared<NullScalar>();
return Status::OK();
}

Status Visit(const BooleanArray& a) { return Finish(a.Value(index_)); }

template <typename T>
Status Visit(const NumericArray<T>& a) {
return Finish(a.Value(index_));
}

template <typename T>
Status Visit(const BaseBinaryArray<T>& a) {
return Finish(a.GetString(index_));
}

Status Visit(const FixedSizeBinaryArray& a) { return Finish(a.GetString(index_)); }

Status Visit(const DayTimeIntervalArray& a) { return Finish(a.Value(index_)); }

template <typename T>
Status Visit(const BaseListArray<T>& a) {
return Finish(a.value_slice(index_));
}

Status Visit(const FixedSizeListArray& a) { return Finish(a.value_slice(index_)); }

Status Visit(const StructArray& a) {
ScalarVector children;
for (const auto& child : a.fields()) {
children.emplace_back();
ARROW_ASSIGN_OR_RAISE(children.back(), child->GetScalar(index_));
}
return Finish(std::move(children));
}

Status Visit(const UnionArray& a) {
return Status::NotImplemented("Non-null UnionScalar");
}

Status Visit(const DictionaryArray& a) {
ARROW_ASSIGN_OR_RAISE(auto value, a.dictionary()->GetScalar(a.GetValueIndex(index_)));
return Finish(std::move(value));
}

Status Visit(const ExtensionArray& a) {
return Status::NotImplemented("Non-null ExtensionScalar");
}

template <typename Arg>
Status Finish(Arg&& arg) {
return MakeScalar(array_.type(), std::forward<Arg>(arg)).Value(&out_);
}

Status Finish(std::string arg) {
return MakeScalar(array_.type(), Buffer::FromString(std::move(arg))).Value(&out_);
}

Result<std::shared_ptr<Scalar>> Finish() && {
if (index_ >= array_.length()) {
return Status::IndexError("tried to refer to element ", index_,
" but array is only ", array_.length(), " long");
}

if (array_.IsNull(index_)) {
return MakeNullScalar(array_.type());
}

RETURN_NOT_OK(VisitArrayInline(array_, this));
return std::move(out_);
}

ScalarFromArraySlotImpl(const Array& array, int64_t index)
: array_(array), index_(index) {}

const Array& array_;
int64_t index_;
std::shared_ptr<Scalar> out_;
};

} // namespace internal

Result<std::shared_ptr<Scalar>> Array::GetScalar(int64_t i) const {
return internal::ScalarFromArraySlotImpl{*this, i}.Finish();
}

std::string Array::Diff(const Array& other) const {
std::stringstream diff;
ARROW_IGNORE_EXPR(Equals(other, EqualOptions().diff_sink(&diff)));
Expand Down Expand Up @@ -363,20 +456,20 @@ Result<std::shared_ptr<Array>> FlattenListArray(const ListArrayT& list_array,

} // namespace

ListArray::ListArray(const std::shared_ptr<ArrayData>& data) { SetData(data); }
ListArray::ListArray(std::shared_ptr<ArrayData> data) { SetData(std::move(data)); }

LargeListArray::LargeListArray(const std::shared_ptr<ArrayData>& data) { SetData(data); }

ListArray::ListArray(const std::shared_ptr<DataType>& type, int64_t length,
const std::shared_ptr<Buffer>& value_offsets,
const std::shared_ptr<Array>& values,
const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count,
ListArray::ListArray(std::shared_ptr<DataType> type, int64_t length,
std::shared_ptr<Buffer> value_offsets, std::shared_ptr<Array> values,
std::shared_ptr<Buffer> null_bitmap, int64_t null_count,
int64_t offset) {
ARROW_CHECK_EQ(type->id(), Type::LIST);
auto internal_data =
ArrayData::Make(type, length, {null_bitmap, value_offsets}, null_count, offset);
auto internal_data = ArrayData::Make(
std::move(type), length,
BufferVector{std::move(null_bitmap), std::move(value_offsets)}, null_count, offset);
internal_data->child_data.emplace_back(values->data());
SetData(internal_data);
SetData(std::move(internal_data));
}

LargeListArray::LargeListArray(const std::shared_ptr<DataType>& type, int64_t length,
Expand Down Expand Up @@ -788,6 +881,13 @@ const StructType* StructArray::struct_type() const {
return checked_cast<const StructType*>(data_->type.get());
}

const ArrayVector& StructArray::fields() const {
for (int i = 0; i < num_fields(); ++i) {
(void)field(i);
}
return boxed_fields_;
}

std::shared_ptr<Array> StructArray::field(int i) const {
std::shared_ptr<Array> result = internal::atomic_load(&boxed_fields_[i]);
if (!result) {
Expand Down Expand Up @@ -1034,6 +1134,23 @@ Status ValidateDictionaryIndices(const std::shared_ptr<Array>& indices,

std::shared_ptr<Array> DictionaryArray::indices() const { return indices_; }

int64_t DictionaryArray::GetValueIndex(int64_t i) const {
switch (indices_->type_id()) {
case Type::INT8:
return checked_cast<const Int8Array&>(*indices_).Value(i);
case Type::INT16:
return checked_cast<const Int16Array&>(*indices_).Value(i);
case Type::INT32:
return checked_cast<const Int32Array&>(*indices_).Value(i);
case Type::INT64:
return checked_cast<const Int64Array&>(*indices_).Value(i);
default:
break;
}
ARROW_CHECK(false) << "unreachable";
return -1;
}

DictionaryArray::DictionaryArray(const std::shared_ptr<ArrayData>& data)
: dict_type_(checked_cast<const DictionaryType*>(data->type.get())) {
ARROW_CHECK_EQ(data->type->id(), Type::DICTIONARY);
Expand Down
19 changes: 13 additions & 6 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ class ARROW_EXPORT Array {
BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
}

/// \brief Return a Scalar containing the value of this array at i
Result<std::shared_ptr<Scalar>> GetScalar(int64_t i) const;

/// Size in the number of elements this array contains.
int64_t length() const { return data_->length; }

Expand Down Expand Up @@ -615,12 +618,11 @@ class BaseListArray : public Array {
/// Concrete Array class for list data
class ARROW_EXPORT ListArray : public BaseListArray<ListType> {
public:
explicit ListArray(const std::shared_ptr<ArrayData>& data);
explicit ListArray(std::shared_ptr<ArrayData> data);

ListArray(const std::shared_ptr<DataType>& type, int64_t length,
const std::shared_ptr<Buffer>& value_offsets,
const std::shared_ptr<Array>& values,
const std::shared_ptr<Buffer>& null_bitmap = NULLPTR,
ListArray(std::shared_ptr<DataType> type, int64_t length,
std::shared_ptr<Buffer> value_offsets, std::shared_ptr<Array> values,
std::shared_ptr<Buffer> null_bitmap = NULLPTR,
int64_t null_count = kUnknownNullCount, int64_t offset = 0);

/// \brief Construct ListArray from array of offsets and child value array
Expand Down Expand Up @@ -1081,6 +1083,8 @@ class ARROW_EXPORT StructArray : public Array {
// count adjusted.
std::shared_ptr<Array> field(int pos) const;

const ArrayVector& fields() const;

/// Returns null if name not found
std::shared_ptr<Array> GetFieldByName(const std::string& name) const;

Expand All @@ -1095,7 +1099,7 @@ class ARROW_EXPORT StructArray : public Array {
private:
// For caching boxed child data
// XXX This is not handled in a thread-safe manner.
mutable std::vector<std::shared_ptr<Array>> boxed_fields_;
mutable ArrayVector boxed_fields_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -1369,6 +1373,9 @@ class ARROW_EXPORT DictionaryArray : public Array {
std::shared_ptr<Array> dictionary() const;
std::shared_ptr<Array> indices() const;

/// \brief Return the ith value of indices, cast to int64_t
int64_t GetValueIndex(int64_t i) const;

const DictionaryType* dict_type() const { return dict_type_; }

private:
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ normalization are also addressed.

## Development Status

Pre-alpha as of June 2019. API subject to change without notice.
Alpha/beta stage as of April 2020. API subject to change, possibly
without deprecation notices.
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include "arrow/dataset/dataset.h"
Expand Down
17 changes: 10 additions & 7 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ namespace arrow {
namespace dataset {

Fragment::Fragment(std::shared_ptr<Expression> partition_expression)
: partition_expression_(partition_expression ? partition_expression : scalar(true)) {}
: partition_expression_(std::move(partition_expression)) {
DCHECK_NE(partition_expression_, nullptr);
}

Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchema() { return schema_; }

Expand Down Expand Up @@ -70,6 +72,12 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt
return MakeMapIterator(fn, std::move(batches_it));
}

Dataset::Dataset(std::shared_ptr<Schema> schema,
std::shared_ptr<Expression> partition_expression)
: schema_(std::move(schema)), partition_expression_(std::move(partition_expression)) {
DCHECK_NE(partition_expression_, nullptr);
}

Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanContext> context) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), context);
Expand All @@ -79,13 +87,8 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); }

FragmentIterator Dataset::GetFragments(std::shared_ptr<Expression> predicate) {
if (partition_expression_) {
predicate = predicate->Assume(*partition_expression_);
}

predicate = predicate->Assume(*partition_expression_);
return predicate->IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
}
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <functional>
Expand Down Expand Up @@ -74,19 +76,20 @@ class ARROW_DS_EXPORT Fragment {
virtual ~Fragment() = default;

protected:
explicit Fragment(std::shared_ptr<Expression> partition_expression = NULLPTR);
Fragment() = default;
explicit Fragment(std::shared_ptr<Expression> partition_expression);

std::shared_ptr<Expression> partition_expression_;
std::shared_ptr<Expression> partition_expression_ = scalar(true);
};

/// \brief A trivial Fragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
public:
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
std::shared_ptr<Expression> = NULLPTR);
std::shared_ptr<Expression> = scalar(true));
explicit InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<Expression> = NULLPTR);
std::shared_ptr<Expression> = scalar(true));

Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override;

Expand Down Expand Up @@ -114,8 +117,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Result<std::shared_ptr<ScannerBuilder>> NewScan();

/// \brief GetFragments returns an iterator of Fragments given a predicate.
FragmentIterator GetFragments(std::shared_ptr<Expression> predicate);
FragmentIterator GetFragments();
FragmentIterator GetFragments(std::shared_ptr<Expression> predicate = scalar(true));

const std::shared_ptr<Schema>& schema() const { return schema_; }

Expand All @@ -140,14 +142,13 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
protected:
explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}

Dataset(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> e)
: schema_(std::move(schema)), partition_expression_(std::move(e)) {}
Dataset() = default;
Dataset(std::shared_ptr<Schema> schema,
std::shared_ptr<Expression> partition_expression);

virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) = 0;

std::shared_ptr<Schema> schema_;
std::shared_ptr<Expression> partition_expression_;
std::shared_ptr<Expression> partition_expression_ = scalar(true);
};

/// \brief A Source which yields fragments wrapping a stream of record batches.
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
/// dataset with possible partitioning according to available
/// partitioning

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
Expand Down
Loading