Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 89 additions & 96 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,65 +1189,116 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros =
auto f0 = field("f0", ::arrow::date32());
auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
std::shared_ptr<::arrow::Field> f3;
if (nanos_as_micros) {
f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
} else {
f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
}
auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO;
auto f3 = field("f3", ::arrow::timestamp(f3_unit));
auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));

std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));

std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
1489272000000, 1489272000000, 1489273000000};
std::vector<int64_t> t64_ns_values = {1489269000000, 1489270000000, 1489271000000,
1489272000000, 1489272000000, 1489273000000};
std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
1489272, 1489272, 1489273};

std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
if (nanos_as_micros) {
ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values,
&a3);
} else {
ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values,
&a3);
}
ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
&a1);
ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
&a2);
auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values;
ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3);
ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5);
ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);

std::vector<std::shared_ptr<::arrow::Column>> columns = {
std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};

*out = Table::Make(schema, columns);
}

TEST(TestArrowReadWrite, DateTimeTypes) {
std::shared_ptr<Table> table;
std::shared_ptr<Table> table, result;
MakeDateTimeTypesTable(&table);

// Use deprecated INT96 type
std::shared_ptr<Table> result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
table, false /* use_threads */, table->num_rows(), {}, &result,
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));

// Cast nanaoseconds to microseconds and use INT64 physical type
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
std::shared_ptr<Table> expected;
MakeDateTimeTypesTable(&table, true);

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}

TEST(TestArrowReadWrite, UseDeprecatedInt96) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

std::vector<bool> is_valid = {true, true, true, false, true, true};

auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
auto t_ns = ::arrow::timestamp(TimeUnit::NANO);

std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
1489272001, 1489272000, 1489273000};
std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
1489272000001, 1489272000000, 1489273000000};
std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
1489271000000000LL, 1489272000000001LL,
1489272000000000LL, 1489273000000000LL};

std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);

// Each input is typed with a unique TimeUnit
auto input_schema = schema(
{field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)});
auto input = Table::Make(
input_schema,
{std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});

// When reading parquet files, all int96 schema fields are converted to
// timestamp nanoseconds
auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
field("f_ns", t_ns)});
auto ex_result = Table::Make(
ex_schema,
{std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});

std::shared_ptr<Table> result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &result,
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));

// Ensure enable_deprecated_int96_timestamps as precedence over
// coerce_timestamps.
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */,
input->num_rows(), {}, &result,
ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps()
->coerce_timestamps(TimeUnit::MILLI)
->build()));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
}

TEST(TestArrowReadWrite, CoerceTimestamps) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
Expand Down Expand Up @@ -1293,6 +1344,12 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
{std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms),
std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)});

std::shared_ptr<Table> milli_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &milli_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));

// Result when coercing to microseconds
auto s3 = std::shared_ptr<::arrow::Schema>(
new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
Expand All @@ -1302,13 +1359,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
{std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us),
std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)});

std::shared_ptr<Table> milli_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &milli_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));

std::shared_ptr<Table> micro_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &micro_result,
Expand Down Expand Up @@ -1453,65 +1503,6 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result));
}

// Regression for ARROW-2802
TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
using ::arrow::Column;
using ::arrow::default_memory_pool;
using ::arrow::Field;
using ::arrow::Schema;
using ::arrow::Table;
using ::arrow::TimestampBuilder;
using ::arrow::TimestampType;
using ::arrow::TimeUnit;

auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);

TimestampBuilder builder(timestamp_type, default_memory_pool());
for (std::int64_t ii = 0; ii < 10; ++ii) {
ASSERT_OK(builder.Append(1000000000L * ii));
}
std::shared_ptr<Array> values;
ASSERT_OK(builder.Finish(&values));

std::vector<std::shared_ptr<Field>> fields;
auto field = std::make_shared<Field>("nanos", timestamp_type);
fields.emplace_back(field);

auto schema = std::make_shared<Schema>(fields);

std::vector<std::shared_ptr<Column>> columns;
auto column = std::make_shared<Column>("nanos", values);
columns.emplace_back(column);

auto table = Table::Make(schema, columns);

auto arrow_writer_properties = ArrowWriterProperties::Builder()
.coerce_timestamps(TimeUnit::MICRO)
->enable_deprecated_int96_timestamps()
->build();

std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
arrow_writer_properties);

ASSERT_EQ(table->num_columns(), result->num_columns());
ASSERT_EQ(table->num_rows(), result->num_rows());

auto actual_column = result->column(0);
auto data = actual_column->data();
auto expected_values =
static_cast<::arrow::NumericArray<TimestampType>*>(values.get())->raw_values();
for (int ii = 0; ii < data->num_chunks(); ++ii) {
auto chunk =
static_cast<::arrow::NumericArray<TimestampType>*>(data->chunk(ii).get());
auto values = chunk->raw_values();
for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) {
// Check that the nanos have been converted to micros
ASSERT_EQ(*expected_values / 1000, values[jj]);
}
}
}

void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
Expand Down Expand Up @@ -2284,11 +2275,13 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));

TEST(TestImpalaConversion, NanosecondToImpala) {
TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) {
// June 20, 2017 16:32:56 and 123456789 nanoseconds
int64_t nanoseconds = INT64_C(1497976376123456789);
Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};

Int96 calculated;

Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
ASSERT_EQ(expected, calculated);
}
Expand Down
16 changes: 2 additions & 14 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ namespace arrow {

using ::arrow::BitUtil::BytesForBits;

constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
constexpr int64_t kMillisecondsInADay = 86400000LL;
constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;

static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
int64_t nanoseconds = 0;

memcpy(&nanoseconds, &impala_timestamp.value, sizeof(int64_t));
return days_since_epoch * kNanosecondsInADay + nanoseconds;
}

template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;

Expand Down Expand Up @@ -1001,7 +989,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {

auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
*data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
*data_ptr++ = Int96GetNanoSeconds(values[i]);
}

if (reader->nullable_values()) {
Expand Down Expand Up @@ -1029,7 +1017,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> {
auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());

for (int64_t i = 0; i < length; i++) {
*out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
*out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
}

if (reader->nullable_values()) {
Expand Down
73 changes: 47 additions & 26 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,45 +423,66 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
return Status::OK();
}

static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) {
switch (time_unit) {
case ::arrow::TimeUnit::MILLI:
return LogicalType::TIMESTAMP_MILLIS;
case ::arrow::TimeUnit::MICRO:
return LogicalType::TIMESTAMP_MICROS;
case ::arrow::TimeUnit::SECOND:
case ::arrow::TimeUnit::NANO:
// No equivalent parquet logical type.
break;
}

return LogicalType::NONE;
}
Copy link
Member

@wesm wesm Dec 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be mindful of parquet v1/v2 issues here -- @xhochy could you take a look at this?

Copy link
Contributor Author

@fsaintjacques fsaintjacques Dec 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The python unit tests found another issue, I'll add a c++ test to catch this earlier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in GetTimestampMetadata, it's now closer to the original.


static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
const ArrowWriterProperties& properties,
ParquetType::type* physical_type,
LogicalType::type* logical_type) {
auto unit = type.unit();
*physical_type = ParquetType::INT64;
const bool coerce = properties.coerce_timestamps_enabled();
const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit();

if (properties.coerce_timestamps_enabled()) {
auto coerce_unit = properties.coerce_timestamps_unit();
if (coerce_unit == ::arrow::TimeUnit::MILLI) {
*logical_type = LogicalType::TIMESTAMP_MILLIS;
} else if (coerce_unit == ::arrow::TimeUnit::MICRO) {
*logical_type = LogicalType::TIMESTAMP_MICROS;
} else {
return Status::NotImplemented(
"Can only coerce Arrow timestamps to milliseconds"
" or microseconds");
// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
if (properties.support_deprecated_int96_timestamps()) {
*physical_type = ParquetType::INT96;
return Status::OK();
}

*physical_type = ParquetType::INT64;
*logical_type = LogicalTypeFromArrowTimeUnit(unit);

// The user is requesting that all timestamp columns are casted to a specific
// type. Only 2 TimeUnit are supported by arrow-parquet.
if (coerce) {
switch (unit) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
break;
case ::arrow::TimeUnit::NANO:
case ::arrow::TimeUnit::SECOND:
return Status::NotImplemented(
"Can only coerce Arrow timestamps to milliseconds"
" or microseconds");
}

return Status::OK();
}

if (unit == ::arrow::TimeUnit::MILLI) {
*logical_type = LogicalType::TIMESTAMP_MILLIS;
} else if (unit == ::arrow::TimeUnit::MICRO) {
// Until ARROW-3729 is resolved, nanoseconds are explicitly converted to
// int64 microseconds when deprecated int96 is not requested.
if (type.unit() == ::arrow::TimeUnit::NANO)
*logical_type = LogicalType::TIMESTAMP_MICROS;
} else if (unit == ::arrow::TimeUnit::NANO) {
if (properties.support_deprecated_int96_timestamps()) {
*physical_type = ParquetType::INT96;
// No corresponding logical type
} else {
*logical_type = LogicalType::TIMESTAMP_MICROS;
}
} else {
else if (type.unit() == ::arrow::TimeUnit::SECOND)
return Status::NotImplemented(
"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
"Parquet.");
}

return Status::OK();
}
} // namespace arrow

Status FieldToNode(const std::shared_ptr<Field>& field,
const WriterProperties& properties,
Expand Down Expand Up @@ -698,7 +719,7 @@ int32_t DecimalSize(int32_t precision) {
}
DCHECK(false);
return -1;
}
} // namespace arrow

} // namespace arrow
} // namespace parquet
Loading