Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@
*.so
*.dylib
.build_cache_dir
.vscode
MANIFEST
1 change: 1 addition & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray);
MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array);
MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, TimestampArray);
MAKE_PRIMITIVE_ARRAY_CASE(DATE, DateArray);
default:
return Status::NotImplemented(type->ToString());
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ INSTANTIATE_TEST_CASE_P(
RoundTripTests, TestRecordBatchParam,
::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
&MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch,
&MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary));
&MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary,
&MakeDateRecordBatch, // &MakeTimeRecordBatch,
));

void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
ipc::MockOutputStream mock;
Expand Down
26 changes: 22 additions & 4 deletions cpp/src/arrow/ipc/json-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,20 @@ class JsonArrayWriter : public ArrayVisitor {

Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); }

Status Visit(const DateArray& array) override { return WritePrimitive(array); }

Status Visit(const TimeArray& array) override { return WritePrimitive(array); }

Status Visit(const TimestampArray& array) override { return WritePrimitive(array); }

Status Visit(const IntervalArray& array) override {
return Status::NotImplemented("interval");
}

Status Visit(const DecimalArray& array) override {
return Status::NotImplemented("decimal");
}

Status Visit(const ListArray& array) override {
WriteValidityField(array);
WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
Expand Down Expand Up @@ -829,7 +843,10 @@ class JsonArrayReader {

template <typename T>
typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
std::is_base_of<BooleanType, T>::value,
std::is_base_of<BooleanType, T>::value ||
std::is_base_of<DateType, T>::value ||
std::is_base_of<TimeType, T>::value ||
std::is_base_of<TimestampType, T>::value,
Status>::type
ReadArray(const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
Expand Down Expand Up @@ -939,6 +956,7 @@ class JsonArrayReader {
return Status::OK();
}


template <typename T>
typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type ReadArray(
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
Expand Down Expand Up @@ -1081,9 +1099,9 @@ class JsonArrayReader {
TYPE_CASE(DoubleType);
TYPE_CASE(StringType);
TYPE_CASE(BinaryType);
NOT_IMPLEMENTED_CASE(DATE);
NOT_IMPLEMENTED_CASE(TIMESTAMP);
NOT_IMPLEMENTED_CASE(TIME);
TYPE_CASE(DateType);
TYPE_CASE(TimestampType);
TYPE_CASE(TimeType);
NOT_IMPLEMENTED_CASE(INTERVAL);
TYPE_CASE(ListType);
TYPE_CASE(StructType);
Expand Down
77 changes: 76 additions & 1 deletion cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ static Status FloatFromFlatuffer(
return Status::OK();
}

static inline TimeUnit FlatbufferToTimeUnit(flatbuf::TimeUnit unit) {
switch (unit) {
case flatbuf::TimeUnit_SECOND:
return TimeUnit::SECOND;
case flatbuf::TimeUnit_MILLISECOND:
return TimeUnit::MILLI;
case flatbuf::TimeUnit_MICROSECOND:
return TimeUnit::MICRO;
case flatbuf::TimeUnit_NANOSECOND:
return TimeUnit::NANO;
}

return TimeUnit::SECOND; // Default
}


// Forward declaration
static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
DictionaryMemo* dictionary_memo, FieldOffset* offset);
Expand Down Expand Up @@ -165,11 +181,31 @@ static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
return Status::OK();
}

static inline flatbuf::TimeUnit TimeUnitToFlatbuffer(TimeUnit unit) {
switch (unit) {
case TimeUnit::SECOND:
return flatbuf::TimeUnit_SECOND;
case TimeUnit::MILLI:
return flatbuf::TimeUnit_MILLISECOND;
case TimeUnit::MICRO:
return flatbuf::TimeUnit_MICROSECOND;
case TimeUnit::NANO:
return flatbuf::TimeUnit_NANOSECOND;
}

return flatbuf::TimeUnit_SECOND; // Default
}

#define TIME_TO_FB(fbb, unit, type) \
flatbuf::Create ## type(fbb, TimeUnitToFlatbuffer(unit)).Union();


#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \
*out_type = flatbuf::Type_Int; \
*offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
break;


static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
switch (type) {
Expand All @@ -190,7 +226,30 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
*out = boolean();
return Status::OK();
case flatbuf::Type_Decimal:
case flatbuf::Type_Timestamp:
return Status::NotImplemented("Type Decimal is not implemented");
case flatbuf::Type_Date:
*out = date();
return Status::OK();
case flatbuf::Type_Time: {
auto unit = static_cast<const flatbuf::Time*>(type_data)->unit();
if ((unit < flatbuf::TimeUnit_MIN) || (unit > flatbuf::TimeUnit_MAX)) {
std::stringstream ss;
ss << "Unknown TimeUnit: " << unit << std::endl;
return Status::Invalid(ss.str());
}
*out = time(FlatbufferToTimeUnit(unit));
return Status::OK();
}
case flatbuf::Type_Timestamp: {
auto unit = static_cast<const flatbuf::Timestamp*>(type_data)->unit();
if ((unit < flatbuf::TimeUnit_MIN) || (unit > flatbuf::TimeUnit_MAX)) {
std::stringstream ss;
ss << "Unknown TimeUnit: " << unit << std::endl;
return Status::Invalid(ss.str());
}
*out = timestamp(FlatbufferToTimeUnit(unit));
return Status::OK();
}
case flatbuf::Type_List:
if (children.size() != 1) {
return Status::Invalid("List must have exactly 1 child field");
Expand Down Expand Up @@ -292,6 +351,22 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
case Type::UNION:
*out_type = flatbuf::Type_Union;
return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
case Type::DATE:
*out_type = flatbuf::Type_Date;
*offset = flatbuf::CreateDate(fbb).Union();
break;
case Type::TIME: {
auto& unit = static_cast<const TimeType&>(*type).unit;
*out_type = flatbuf::Type_Time;
*offset = TIME_TO_FB(fbb, unit, Time);
}
break;
case Type::TIMESTAMP: {
auto& unit = static_cast<const TimestampType&>(*type).unit;
*out_type = flatbuf::Type_Timestamp;
*offset = TIME_TO_FB(fbb, unit, Timestamp);
}
break;
default:
*out_type = flatbuf::Type_NONE; // Make clang-tidy happy
std::stringstream ss;
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/ipc/test-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,36 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}

Status MakeDateRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("f0", date());
std::shared_ptr<Schema> schema(new Schema({f0}));

// Example data
std::shared_ptr<Array> date_array;
std::vector<bool> is_valid{true, true, false, true, true};
std::vector<int64_t> values{0, -7, 636390, 706397, 736390};
ArrayFromVector<DateType, int64_t>(is_valid, values, &date_array);

out->reset(new RecordBatch(schema, values.size(), {date_array}));
return Status::OK();
}

Status MakeTimeRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("t0", timestamp(TimeUnit::MILLI));
std::shared_ptr<Schema> schema(new Schema({f0}));

// Example data
std::shared_ptr<Array> time_array;
std::vector<bool> is_valid{true, true, false, true, true};
std::vector<int64_t> values{0, -27, 390, 7097, 36390};
ArrayFromVector<TimeType, int64_t>(is_valid, values, &time_array);

out->reset(new RecordBatch(schema, values.size(), {time_array}));
return Status::OK();
}

} // namespace ipc
} // namespace arrow

Expand Down
8 changes: 3 additions & 5 deletions cpp/src/arrow/pretty_print.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ class ArrayPrinter : public ArrayVisitor {

Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); }

Status Visit(const DateArray& array) override { return Status::NotImplemented("date"); }
Status Visit(const DateArray& array) override { return WritePrimitive(array); }

Status Visit(const TimeArray& array) override { return Status::NotImplemented("time"); }
Status Visit(const TimeArray& array) override { return WritePrimitive(array); }

Status Visit(const TimestampArray& array) override {
return Status::NotImplemented("timestamp");
}
Status Visit(const TimestampArray& array) override { return WritePrimitive(array); }

Status Visit(const IntervalArray& array) override {
return Status::NotImplemented("interval");
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct Type {
// Variable-length bytes (no guarantee of UTF8-ness)
BINARY,

// By default, int32 days since the UNIX epoch
// By default, int64 days since the UNIX epoch
DATE,

// Exact timestamp encoded with int64 since UNIX epoch
Expand Down Expand Up @@ -439,7 +439,12 @@ struct ARROW_EXPORT DateType : public FixedWidthType {
static std::string name() { return "date"; }
};

enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
enum class TimeUnit : char {
SECOND = 0,
MILLI = 1,
MICRO = 2,
NANO = 3,
};

struct ARROW_EXPORT TimeType : public FixedWidthType {
static constexpr Type::type type_id = Type::TIME;
Expand Down
16 changes: 4 additions & 12 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,12 @@ _NUMERIC_TYPE_DECL(UInt64);
_NUMERIC_TYPE_DECL(HalfFloat);
_NUMERIC_TYPE_DECL(Float);
_NUMERIC_TYPE_DECL(Double);
_NUMERIC_TYPE_DECL(Date);
_NUMERIC_TYPE_DECL(Time);
_NUMERIC_TYPE_DECL(Timestamp);

#undef _NUMERIC_TYPE_DECL

struct DateType;
using DateArray = NumericArray<DateType>;
using DateBuilder = NumericBuilder<DateType>;

struct TimeType;
using TimeArray = NumericArray<TimeType>;
using TimeBuilder = NumericBuilder<TimeType>;

struct TimestampType;
using TimestampArray = NumericArray<TimestampType>;
using TimestampBuilder = NumericBuilder<TimestampType>;

struct IntervalType;
using IntervalArray = NumericArray<IntervalType>;

Expand All @@ -125,6 +116,7 @@ std::shared_ptr<DataType> ARROW_EXPORT float64();
std::shared_ptr<DataType> ARROW_EXPORT utf8();
std::shared_ptr<DataType> ARROW_EXPORT binary();
std::shared_ptr<DataType> ARROW_EXPORT date();
std::shared_ptr<DataType> ARROW_EXPORT time();

} // namespace arrow

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct TypeTraits<Int64Type> {
template <>
struct TypeTraits<DateType> {
using ArrayType = DateArray;
// using BuilderType = DateBuilder;
using BuilderType = DateBuilder;

static inline int64_t bytes_required(int64_t elements) {
return elements * sizeof(int64_t);
Expand All @@ -133,7 +133,7 @@ struct TypeTraits<DateType> {
template <>
struct TypeTraits<TimestampType> {
using ArrayType = TimestampArray;
// using BuilderType = TimestampBuilder;
using BuilderType = TimestampBuilder;

static inline int64_t bytes_required(int64_t elements) {
return elements * sizeof(int64_t);
Expand All @@ -144,7 +144,7 @@ struct TypeTraits<TimestampType> {
template <>
struct TypeTraits<TimeType> {
using ArrayType = TimeArray;
// using BuilderType = TimestampBuilder;
using BuilderType = TimeBuilder;

static inline int64_t bytes_required(int64_t elements) {
return elements * sizeof(int64_t);
Expand Down