From 4f759a2c45c6a76c6684ac66c059869b3fd0ea5c Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Sun, 26 Jul 2015 11:37:59 -0700 Subject: [PATCH 1/2] ORC-9. Create a vector type for timestamp columns that separates out the seconds from the nanoseconds. --- c++/include/orc/Vector.hh | 18 +++++++++ c++/src/ColumnPrinter.cc | 32 +++++----------- c++/src/ColumnReader.cc | 48 ++++++++++++------------ c++/src/ColumnReader.hh | 7 ++++ c++/src/Reader.cc | 43 +++++++++++++++++---- c++/src/Vector.cc | 28 ++++++++++++++ c++/test/TestColumnPrinter.cc | 38 ++++++++++++------- c++/test/TestColumnReader.cc | 70 +++++++++++++++++++++++------------ 8 files changed, 194 insertions(+), 90 deletions(-) diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh index e97c51e433..ff732d0223 100644 --- a/c++/include/orc/Vector.hh +++ b/c++/include/orc/Vector.hh @@ -294,6 +294,24 @@ namespace orc { friend class DecimalHive11ColumnReader; }; + /** + * A column vector batch for storing timestamp values. + * The timestamps are stored split into the time_t value (seconds since + * 1 Jan 1970 00:00:00) and the nanoseconds within the time_t value. + */ + struct TimestampVectorBatch: public ColumnVectorBatch { + TimestampVectorBatch(uint64_t capacity, MemoryPool& pool); + virtual ~TimestampVectorBatch(); + std::string toString() const; + void resize(uint64_t capacity); + + // the number of seconds past 1 Jan 1970 00:00 UTC (aka time_t) + DataBuffer data; + + // the nanoseconds of each value + DataBuffer nanoseconds; + }; + } #endif diff --git a/c++/src/ColumnPrinter.cc b/c++/src/ColumnPrinter.cc index 190441c2bb..1367b02011 100644 --- a/c++/src/ColumnPrinter.cc +++ b/c++/src/ColumnPrinter.cc @@ -67,8 +67,8 @@ namespace orc { class TimestampColumnPrinter: public ColumnPrinter { private: - const int64_t* data; - time_t epoch; + const int64_t* seconds; + const int64_t* nanoseconds; public: TimestampColumnPrinter(std::string&, const Type&); @@ -667,33 +667,18 @@ namespace orc { const Type& type ): ColumnPrinter(buffer, type) { - struct tm epochTm; - epochTm.tm_sec = 0; - epochTm.tm_min = 0; - epochTm.tm_hour = 0; - epochTm.tm_mday = 1; - epochTm.tm_mon = 0; - epochTm.tm_year = 70; - epochTm.tm_isdst = 0; - epoch = mktime(&epochTm); + // PASS } void TimestampColumnPrinter::printRow(uint64_t rowId) { - const int64_t NANOS_PER_SECOND = 1000000000; const int64_t NANO_DIGITS = 9; if (hasNulls && !notNull[rowId]) { writeString(buffer, "null"); } else { - int64_t nanos = data[rowId] % NANOS_PER_SECOND; - time_t seconds = - static_cast(data[rowId] / NANOS_PER_SECOND) + epoch; - // make sure the nanos are positive - if (nanos < 0) { - seconds -= 1; - nanos = -nanos; - } + int64_t nanos = nanoseconds[rowId]; + time_t secs = static_cast(seconds[rowId]); struct tm tmValue; - localtime_r(&seconds, &tmValue); + localtime_r(&secs, &tmValue); char timeBuffer[20]; strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", &tmValue); writeChar(buffer, '"'); @@ -720,6 +705,9 @@ namespace orc { void TimestampColumnPrinter::reset(const ColumnVectorBatch& batch) { ColumnPrinter::reset(batch); - data = dynamic_cast(batch).data.data(); + const TimestampVectorBatch& ts = + dynamic_cast(batch); + seconds = ts.data.data(); + nanoseconds = ts.nanoseconds.data(); } } diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc index 78fb15f4eb..7d21ce1b0e 100644 --- a/c++/src/ColumnReader.cc +++ b/c++/src/ColumnReader.cc @@ -261,10 +261,11 @@ namespace orc { numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0); } - class TimestampColumnReader: public IntegerColumnReader { + class TimestampColumnReader: public ColumnReader { private: + std::unique_ptr secondsRle; std::unique_ptr nanoRle; - DataBuffer nanoBuffer; + const int64_t epochOffset; public: TimestampColumnReader(const Type& type, StripeStreams& stripe); @@ -280,10 +281,13 @@ namespace orc { TimestampColumnReader::TimestampColumnReader(const Type& type, StripeStreams& stripe - ): IntegerColumnReader(type, - stripe), - nanoBuffer(memoryPool, 1024){ + ): ColumnReader(type, stripe), + epochOffset(stripe.getEpochOffset()) { RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); + secondsRle = createRleDecoder(stripe.getStream(columnId, + proto::Stream_Kind_DATA, + true), + true, vers, memoryPool); nanoRle = createRleDecoder(stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true), @@ -295,41 +299,37 @@ namespace orc { } uint64_t TimestampColumnReader::skip(uint64_t numValues) { - numValues = IntegerColumnReader::skip(numValues); + numValues = ColumnReader::skip(numValues); + secondsRle->skip(numValues); nanoRle->skip(numValues); return numValues; } void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, - uint64_t numValues, - char *notNull) { + uint64_t numValues, + char *notNull) { ColumnReader::next(rowBatch, numValues, notNull); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; - int64_t* pStamp = dynamic_cast(rowBatch).data.data(); - - // make sure that nanoBuffer is large enough - if (numValues > nanoBuffer.size()) { - nanoBuffer.resize(numValues); - } - - rle->next(pStamp, numValues, notNull); - nanoRle->next(nanoBuffer.data(), numValues, notNull); + TimestampVectorBatch& timestampBatch = + dynamic_cast(rowBatch); + int64_t *secsBuffer = timestampBatch.data.data(); + secondsRle->next(secsBuffer, numValues, notNull); + int64_t *nanoBuffer = timestampBatch.nanoseconds.data(); + nanoRle->next(nanoBuffer, numValues, notNull); // Construct the values for(uint64_t i=0; i < numValues; i++) { if (notNull == nullptr || notNull[i]) { - int64_t nanosec = nanoBuffer[i] >> 3; uint64_t zeros = nanoBuffer[i] & 0x7; + nanoBuffer[i] >>= 3; if (zeros != 0) { for(uint64_t j = 0; j <= zeros; ++j) { - nanosec *= 10; + nanoBuffer[i] *= 10; } } - pStamp[i] = pStamp[i] * 1000000000 + 1420070400000000000; - if (pStamp[i] >= 0) { - pStamp[i] += nanosec; - } else { - pStamp[i] -= nanosec; + secsBuffer[i] += epochOffset; + if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) { + secsBuffer[i] -= 1; } } } diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh index b90c9428ea..73db911613 100644 --- a/c++/src/ColumnReader.hh +++ b/c++/src/ColumnReader.hh @@ -63,6 +63,13 @@ namespace orc { * Get the memory pool for this reader. */ virtual MemoryPool& getMemoryPool() const = 0; + + /** + * Get the number of seconds between the ORC epoch and Unix epoch. + * ORC epoch is 1 Jan 2015 00:00:00 local. + * Unix epoch is 1 Jan 1970 00:00:00 UTC. + */ + virtual int64_t getEpochOffset() const = 0; }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 684b9e4cc9..6b1f25b052 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -854,6 +854,8 @@ namespace orc { class ReaderImpl : public Reader { private: + const int64_t epochOffset; + // inputs std::unique_ptr stream; ReaderOptions options; @@ -993,12 +995,25 @@ namespace orc { } } + int64_t getEpochOffset() { + struct tm epoch; + epoch.tm_sec = 0; + epoch.tm_min = 0; + epoch.tm_hour = 0; + epoch.tm_mday = 1; + epoch.tm_mon = 0; + epoch.tm_year = 2015 - 1900; + epoch.tm_isdst = 0; + return static_cast(mktime(&epoch)); + } + ReaderImpl::ReaderImpl(std::unique_ptr input, const ReaderOptions& opts, std::unique_ptr _postscript, std::unique_ptr _footer, uint64_t _footerStart - ): stream(std::move(input)), + ): epochOffset(getEpochOffset()), + stream(std::move(input)), options(opts), footerStart(_footerStart), memoryPool(*opts.getMemoryPool()), @@ -1334,13 +1349,15 @@ namespace orc { const uint64_t stripeStart; InputStream& input; MemoryPool& memoryPool; + const int64_t epochOffset; public: StripeStreamsImpl(const ReaderImpl& reader, const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& input, - MemoryPool& memoryPool); + MemoryPool& memoryPool, + int64_t epochOffset); virtual ~StripeStreamsImpl(); @@ -1356,18 +1373,22 @@ namespace orc { bool shouldStream) const override; MemoryPool& getMemoryPool() const override; + + int64_t getEpochOffset() const override; }; StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader, const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& _input, - MemoryPool& _memoryPool + MemoryPool& _memoryPool, + int64_t _epochOffset ): reader(_reader), footer(_footer), stripeStart(_stripeStart), input(_input), - memoryPool(_memoryPool) { + memoryPool(_memoryPool), + epochOffset(_epochOffset) { // PASS } @@ -1383,10 +1404,15 @@ namespace orc { return reader.getSelectedColumns(); } - proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId) const { + proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId + ) const { return footer.columns(static_cast(columnId)); } + int64_t StripeStreamsImpl::getEpochOffset() const { + return epochOffset; + } + std::unique_ptr StripeStreamsImpl::getStream(int64_t columnId, proto::Stream_Kind kind, @@ -1426,7 +1452,8 @@ namespace orc { StripeStreamsImpl stripeStreams(*this, currentStripeFooter, currentStripeInfo.offset(), *(stream.get()), - memoryPool); + memoryPool, + epochOffset); reader = buildReader(*(schema.get()), stripeStreams); } @@ -1479,7 +1506,6 @@ namespace orc { case SHORT: case INT: case LONG: - case TIMESTAMP: case DATE: result = new LongVectorBatch(capacity, memoryPool); break; @@ -1493,6 +1519,9 @@ namespace orc { case VARCHAR: result = new StringVectorBatch(capacity, memoryPool); break; + case TIMESTAMP: + result = new TimestampVectorBatch(capacity, memoryPool); + break; case STRUCT: result = new StructVectorBatch(capacity, memoryPool); for(uint64_t i=0; i < type.getSubtypeCount(); ++i) { diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc index e65c69b3cc..55167f4a4a 100644 --- a/c++/src/Vector.cc +++ b/c++/src/Vector.cc @@ -303,4 +303,32 @@ namespace orc { std::string Decimal::toString() const { return value.toDecimalString(scale); } + + TimestampVectorBatch::TimestampVectorBatch(uint64_t capacity, + MemoryPool& pool + ): ColumnVectorBatch(capacity, + pool), + data(pool, capacity), + nanoseconds(pool, capacity) { + // PASS + } + + TimestampVectorBatch::~TimestampVectorBatch() { + // PASS + } + + std::string TimestampVectorBatch::toString() const { + std::ostringstream buffer; + buffer << "Timestamp vector <" << numElements << " of " << capacity << ">"; + return buffer.str(); + } + + void TimestampVectorBatch::resize(uint64_t cap) { + if (capacity < cap) { + ColumnVectorBatch::resize(cap); + data.resize(cap); + nanoseconds.resize(cap); + } + } + } diff --git a/c++/test/TestColumnPrinter.cc b/c++/test/TestColumnPrinter.cc index a25bf3b67c..a2afdb3c0a 100644 --- a/c++/test/TestColumnPrinter.cc +++ b/c++/test/TestColumnPrinter.cc @@ -128,21 +128,33 @@ namespace orc { std::string line; std::unique_ptr type = createPrimitiveType(TIMESTAMP); std::unique_ptr printer = createColumnPrinter(line, *type); - LongVectorBatch batch(1024, *getDefaultPool()); + TimestampVectorBatch batch(1024, *getDefaultPool()); batch.numElements = 12; batch.hasNulls = false; - batch.data[0] = 1420070400000000000; - batch.data[1] = 963270000000000000; - batch.data[2] = 1426168859000000000; - batch.data[3] = 1426168859000000001; - batch.data[4] = 1426168859000000010; - batch.data[5] = 1426168859000000100; - batch.data[6] = 1426168859000001000; - batch.data[7] = 1426168859000010000; - batch.data[8] = 1426168859000100000; - batch.data[9] = 1426168859001000000; - batch.data[10] = 1426168859010000000; - batch.data[11] = 1426168859100000000; + batch.data[0] = 1420099200; + batch.data[1] = 963298800; + batch.data[2] = 1426197659; + batch.data[3] = 1426197659; + batch.data[4] = 1426197659; + batch.data[5] = 1426197659; + batch.data[6] = 1426197659; + batch.data[7] = 1426197659; + batch.data[8] = 1426197659; + batch.data[9] = 1426197659; + batch.data[10] = 1426197659; + batch.data[11] = 1426197659; + batch.nanoseconds[0] = 0; + batch.nanoseconds[1] = 0; + batch.nanoseconds[2] = 0; + batch.nanoseconds[3] = 1; + batch.nanoseconds[4] = 10; + batch.nanoseconds[5] = 100; + batch.nanoseconds[6] = 1000; + batch.nanoseconds[7] = 10000; + batch.nanoseconds[8] = 100000; + batch.nanoseconds[9] = 1000000; + batch.nanoseconds[10] = 10000000; + batch.nanoseconds[11] = 100000000; const char *expected[] = {"\"2015-01-01 00:00:00.0\"", "\"2000-07-11 00:00:00.0\"", "\"2015-03-12 15:00:59.0\"", diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc index 4c669d09c2..41231bc69a 100644 --- a/c++/test/TestColumnReader.cc +++ b/c++/test/TestColumnReader.cc @@ -48,6 +48,11 @@ class MockStripeStreams: public StripeStreams { MemoryPool& getMemoryPool() const { return *getDefaultPool(); } + + // the epoch offset for America/Los_Angeles + int64_t getEpochOffset() const { + return 1420099200; + } }; MockStripeStreams::~MockStripeStreams() { @@ -2634,17 +2639,19 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) { std::unique_ptr reader = buildReader(*rowType, streams); - LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool()); + TimestampVectorBatch *longBatch = + new TimestampVectorBatch(1024, *getDefaultPool()); StructVectorBatch batch(1024, *getDefaultPool()); batch.fields.push_back(longBatch); - // Test values are nanoseconds since 1970-01-01 00:00:00.0 - int64_t test_vals[] = { - 1368178850110000000, // 2013-05-10 10:40:50.11 - 1402483311120000000, // 2014-06-11 11:41:51.12 - 1436701372130000000, // 2015-07-12 12:42:52.13 - 1471092233140000000 // 2016-08-13 13:43:53.14 - }; + const char *(expected[]) = {"Fri May 10 10:40:50 2013\n", + "Wed Jun 11 11:41:51 2014\n", + "Sun Jul 12 12:42:52 2015\n", + "Sat Aug 13 13:43:53 2016\n"}; + int64_t expected_nano[] = {110000000, + 120000000, + 130000000, + 140000000}; int vals_ix = 0; reader->next(batch, 3, 0); @@ -2658,7 +2665,9 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) { EXPECT_EQ(0, longBatch->notNull[i]); } else { EXPECT_EQ(1, longBatch->notNull[i]); - EXPECT_EQ(test_vals[vals_ix], longBatch->data[i]); + time_t time = static_cast(longBatch->data[i]); + EXPECT_STREQ(expected[vals_ix], ctime(&time)); + EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]); vals_ix++; } } @@ -2675,7 +2684,9 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) { EXPECT_EQ(0, longBatch->notNull[i]); } else { EXPECT_EQ(1, longBatch->notNull[i]); - EXPECT_EQ(test_vals[vals_ix], longBatch->data[i]); + time_t time = static_cast(longBatch->data[i]); + EXPECT_STREQ(expected[vals_ix], ctime(&time)); + EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]); vals_ix++; } } @@ -2739,22 +2750,31 @@ TEST(TestColumnReader, testTimestamp) { std::unique_ptr reader = buildReader(*rowType, streams); - LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool()); + TimestampVectorBatch *longBatch = + new TimestampVectorBatch(1024, *getDefaultPool()); StructVectorBatch batch(1024, *getDefaultPool()); batch.fields.push_back(longBatch); - // Test values are nanoseconds since 1970-01-01 00:00:00.0 - const int64_t expected[] = {952873200000000000, // 2000-03-12 15:00:00.0 - 953553600123456789, // 2000-03-20 12:00:00.123456789 - -2208988800000000000, // 1900-01-01 00:00:00.0 - -2198229903190000000, // 1900-05-05 12:34:56.19 - -2166693903190100000, // 1901-05-05 12:34:56.1901 - -2135157903190200000, // 1902-05-05 12:34:56.1902 - -2103621903190300000, // 1903-05-05 12:34:56.1903 - -2071999503190400000, // 1904-05-05 12:34:56.1904 - -2040463503190500000, // 1905-05-05 12:34:56.1905 - -1882697103191000000 // 1910-05-05 12:34:56.191 - }; + const char *(expected[]) = {"Sun Mar 12 15:00:00 2000\n", + "Mon Mar 20 12:00:00 2000\n", + "Mon Jan 1 00:00:00 1900\n", + "Sat May 5 12:34:56 1900\n", + "Sun May 5 12:34:56 1901\n", + "Mon May 5 12:34:56 1902\n", + "Tue May 5 12:34:56 1903\n", + "Thu May 5 12:34:56 1904\n", + "Fri May 5 12:34:56 1905\n", + "Thu May 5 12:34:56 1910\n"}; + const int64_t expectedNano[] = {0, + 123456789, + 0, + 190000000, + 190100000, + 190200000, + 190300000, + 190400000, + 190500000, + 191000000}; reader->next(batch, 10, 0); ASSERT_EQ(10, batch.numElements); @@ -2763,7 +2783,9 @@ TEST(TestColumnReader, testTimestamp) { ASSERT_EQ(true, !longBatch->hasNulls); for (size_t i = 0; i < batch.numElements; ++i) { - EXPECT_EQ(expected[i], longBatch->data[i]) << "Wrong value at " << i; + time_t time = static_cast(longBatch->data[i]); + EXPECT_STREQ(expected[i], ctime(&time)) << "Wrong value at " << i; + EXPECT_EQ(expectedNano[i], longBatch->nanoseconds[i]); } } From f8ab57c4304dc059f2fb355429980f59902f8995 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Mon, 27 Jul 2015 08:54:10 -0700 Subject: [PATCH 2/2] Add additional comment from Aliaksei's review. --- c++/src/Reader.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 6b1f25b052..b7cf00dc52 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -996,6 +996,8 @@ namespace orc { } int64_t getEpochOffset() { + // Build the literal for the ORC epoch + // 2015 Jan 1 00:00:00 struct tm epoch; epoch.tm_sec = 0; epoch.tm_min = 0;