Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions c++/include/orc/Vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,24 @@ namespace orc {
friend class DecimalHive11ColumnReader;
};

/**
* A column vector batch for storing timestamp values.
* The timestamps are stored split into the time_t value (seconds since
* 1 Jan 1970 00:00:00) and the nanoseconds within the time_t value.
*/
struct TimestampVectorBatch: public ColumnVectorBatch {
TimestampVectorBatch(uint64_t capacity, MemoryPool& pool);
virtual ~TimestampVectorBatch();
std::string toString() const;
void resize(uint64_t capacity);

// the number of seconds past 1 Jan 1970 00:00 UTC (aka time_t)
DataBuffer<int64_t> data;

// the nanoseconds of each value
DataBuffer<int64_t> nanoseconds;
};

}

#endif
32 changes: 10 additions & 22 deletions c++/src/ColumnPrinter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace orc {

class TimestampColumnPrinter: public ColumnPrinter {
private:
const int64_t* data;
time_t epoch;
const int64_t* seconds;
const int64_t* nanoseconds;

public:
TimestampColumnPrinter(std::string&, const Type&);
Expand Down Expand Up @@ -667,33 +667,18 @@ namespace orc {
const Type& type
): ColumnPrinter(buffer,
type) {
struct tm epochTm;
epochTm.tm_sec = 0;
epochTm.tm_min = 0;
epochTm.tm_hour = 0;
epochTm.tm_mday = 1;
epochTm.tm_mon = 0;
epochTm.tm_year = 70;
epochTm.tm_isdst = 0;
epoch = mktime(&epochTm);
// PASS
}

void TimestampColumnPrinter::printRow(uint64_t rowId) {
const int64_t NANOS_PER_SECOND = 1000000000;
const int64_t NANO_DIGITS = 9;
if (hasNulls && !notNull[rowId]) {
writeString(buffer, "null");
} else {
int64_t nanos = data[rowId] % NANOS_PER_SECOND;
time_t seconds =
static_cast<time_t>(data[rowId] / NANOS_PER_SECOND) + epoch;
// make sure the nanos are positive
if (nanos < 0) {
seconds -= 1;
nanos = -nanos;
}
int64_t nanos = nanoseconds[rowId];
time_t secs = static_cast<time_t>(seconds[rowId]);
struct tm tmValue;
localtime_r(&seconds, &tmValue);
localtime_r(&secs, &tmValue);
char timeBuffer[20];
strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", &tmValue);
writeChar(buffer, '"');
Expand All @@ -720,6 +705,9 @@ namespace orc {

void TimestampColumnPrinter::reset(const ColumnVectorBatch& batch) {
ColumnPrinter::reset(batch);
data = dynamic_cast<const LongVectorBatch&>(batch).data.data();
const TimestampVectorBatch& ts =
dynamic_cast<const TimestampVectorBatch&>(batch);
seconds = ts.data.data();
nanoseconds = ts.nanoseconds.data();
}
}
48 changes: 24 additions & 24 deletions c++/src/ColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ namespace orc {
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
}

class TimestampColumnReader: public IntegerColumnReader {
class TimestampColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::RleDecoder> secondsRle;
std::unique_ptr<orc::RleDecoder> nanoRle;
DataBuffer<int64_t> nanoBuffer;
const int64_t epochOffset;

public:
TimestampColumnReader(const Type& type, StripeStreams& stripe);
Expand All @@ -280,10 +281,13 @@ namespace orc {

TimestampColumnReader::TimestampColumnReader(const Type& type,
StripeStreams& stripe
): IntegerColumnReader(type,
stripe),
nanoBuffer(memoryPool, 1024){
): ColumnReader(type, stripe),
epochOffset(stripe.getEpochOffset()) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
secondsRle = createRleDecoder(stripe.getStream(columnId,
proto::Stream_Kind_DATA,
true),
true, vers, memoryPool);
nanoRle = createRleDecoder(stripe.getStream(columnId,
proto::Stream_Kind_SECONDARY,
true),
Expand All @@ -295,41 +299,37 @@ namespace orc {
}

uint64_t TimestampColumnReader::skip(uint64_t numValues) {
numValues = IntegerColumnReader::skip(numValues);
numValues = ColumnReader::skip(numValues);
secondsRle->skip(numValues);
nanoRle->skip(numValues);
return numValues;
}

void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
int64_t* pStamp = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();

// make sure that nanoBuffer is large enough
if (numValues > nanoBuffer.size()) {
nanoBuffer.resize(numValues);
}

rle->next(pStamp, numValues, notNull);
nanoRle->next(nanoBuffer.data(), numValues, notNull);
TimestampVectorBatch& timestampBatch =
dynamic_cast<TimestampVectorBatch&>(rowBatch);
int64_t *secsBuffer = timestampBatch.data.data();
secondsRle->next(secsBuffer, numValues, notNull);
int64_t *nanoBuffer = timestampBatch.nanoseconds.data();
nanoRle->next(nanoBuffer, numValues, notNull);

// Construct the values
for(uint64_t i=0; i < numValues; i++) {
if (notNull == nullptr || notNull[i]) {
int64_t nanosec = nanoBuffer[i] >> 3;
uint64_t zeros = nanoBuffer[i] & 0x7;
nanoBuffer[i] >>= 3;
if (zeros != 0) {
for(uint64_t j = 0; j <= zeros; ++j) {
nanosec *= 10;
nanoBuffer[i] *= 10;
}
}
pStamp[i] = pStamp[i] * 1000000000 + 1420070400000000000;
if (pStamp[i] >= 0) {
pStamp[i] += nanosec;
} else {
pStamp[i] -= nanosec;
secsBuffer[i] += epochOffset;
if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) {
secsBuffer[i] -= 1;
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions c++/src/ColumnReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ namespace orc {
* Get the memory pool for this reader.
*/
virtual MemoryPool& getMemoryPool() const = 0;

/**
* Get the number of seconds between the ORC epoch and Unix epoch.
* ORC epoch is 1 Jan 2015 00:00:00 local.
* Unix epoch is 1 Jan 1970 00:00:00 UTC.
*/
virtual int64_t getEpochOffset() const = 0;
};

/**
Expand Down
45 changes: 38 additions & 7 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,8 @@ namespace orc {

class ReaderImpl : public Reader {
private:
const int64_t epochOffset;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReaderImpl::epochOffset is not used (we seem to call getEpochOffset() everywhere).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ColumnReader uses StripeStreamsImpl::getEpochOffset() and the implementation of that method uses ReaderImpl::epochOffset. The initializer for ReaderImpl::epochOffset uses the static method getEpochOffset, but it should only be called once per a reader.

// inputs
std::unique_ptr<InputStream> stream;
ReaderOptions options;
Expand Down Expand Up @@ -993,12 +995,27 @@ namespace orc {
}
}

int64_t getEpochOffset() {
// Build the literal for the ORC epoch
// 2015 Jan 1 00:00:00
struct tm epoch;
epoch.tm_sec = 0;
epoch.tm_min = 0;
epoch.tm_hour = 0;
epoch.tm_mday = 1;
epoch.tm_mon = 0;
epoch.tm_year = 2015 - 1900;
epoch.tm_isdst = 0;
return static_cast<int64_t>(mktime(&epoch));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused: Why is the offset 115 years and 1 day, instead of 45 years (2015 - 1970)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the representation of 1 Jan 2015 00:00:00, so it is converting that date into a time_t. Look at the man page for mktime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I should add a comment saying that :).

ReaderImpl::ReaderImpl(std::unique_ptr<InputStream> input,
const ReaderOptions& opts,
std::unique_ptr<proto::PostScript> _postscript,
std::unique_ptr<proto::Footer> _footer,
uint64_t _footerStart
): stream(std::move(input)),
): epochOffset(getEpochOffset()),
stream(std::move(input)),
options(opts),
footerStart(_footerStart),
memoryPool(*opts.getMemoryPool()),
Expand Down Expand Up @@ -1334,13 +1351,15 @@ namespace orc {
const uint64_t stripeStart;
InputStream& input;
MemoryPool& memoryPool;
const int64_t epochOffset;

public:
StripeStreamsImpl(const ReaderImpl& reader,
const proto::StripeFooter& footer,
uint64_t stripeStart,
InputStream& input,
MemoryPool& memoryPool);
MemoryPool& memoryPool,
int64_t epochOffset);

virtual ~StripeStreamsImpl();

Expand All @@ -1356,18 +1375,22 @@ namespace orc {
bool shouldStream) const override;

MemoryPool& getMemoryPool() const override;

int64_t getEpochOffset() const override;
};

StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader,
const proto::StripeFooter& _footer,
uint64_t _stripeStart,
InputStream& _input,
MemoryPool& _memoryPool
MemoryPool& _memoryPool,
int64_t _epochOffset
): reader(_reader),
footer(_footer),
stripeStart(_stripeStart),
input(_input),
memoryPool(_memoryPool) {
memoryPool(_memoryPool),
epochOffset(_epochOffset) {
// PASS
}

Expand All @@ -1383,10 +1406,15 @@ namespace orc {
return reader.getSelectedColumns();
}

proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId) const {
proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId
) const {
return footer.columns(static_cast<int>(columnId));
}

int64_t StripeStreamsImpl::getEpochOffset() const {
return epochOffset;
}

std::unique_ptr<SeekableInputStream>
StripeStreamsImpl::getStream(int64_t columnId,
proto::Stream_Kind kind,
Expand Down Expand Up @@ -1426,7 +1454,8 @@ namespace orc {
StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
currentStripeInfo.offset(),
*(stream.get()),
memoryPool);
memoryPool,
epochOffset);
reader = buildReader(*(schema.get()), stripeStreams);
}

Expand Down Expand Up @@ -1479,7 +1508,6 @@ namespace orc {
case SHORT:
case INT:
case LONG:
case TIMESTAMP:
case DATE:
result = new LongVectorBatch(capacity, memoryPool);
break;
Expand All @@ -1493,6 +1521,9 @@ namespace orc {
case VARCHAR:
result = new StringVectorBatch(capacity, memoryPool);
break;
case TIMESTAMP:
result = new TimestampVectorBatch(capacity, memoryPool);
break;
case STRUCT:
result = new StructVectorBatch(capacity, memoryPool);
for(uint64_t i=0; i < type.getSubtypeCount(); ++i) {
Expand Down
28 changes: 28 additions & 0 deletions c++/src/Vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,32 @@ namespace orc {
std::string Decimal::toString() const {
return value.toDecimalString(scale);
}

TimestampVectorBatch::TimestampVectorBatch(uint64_t capacity,
MemoryPool& pool
): ColumnVectorBatch(capacity,
pool),
data(pool, capacity),
nanoseconds(pool, capacity) {
// PASS
}

TimestampVectorBatch::~TimestampVectorBatch() {
// PASS
}

std::string TimestampVectorBatch::toString() const {
std::ostringstream buffer;
buffer << "Timestamp vector <" << numElements << " of " << capacity << ">";
return buffer.str();
}

void TimestampVectorBatch::resize(uint64_t cap) {
if (capacity < cap) {
ColumnVectorBatch::resize(cap);
data.resize(cap);
nanoseconds.resize(cap);
}
}

}
38 changes: 25 additions & 13 deletions c++/test/TestColumnPrinter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,33 @@ namespace orc {
std::string line;
std::unique_ptr<Type> type = createPrimitiveType(TIMESTAMP);
std::unique_ptr<ColumnPrinter> printer = createColumnPrinter(line, *type);
LongVectorBatch batch(1024, *getDefaultPool());
TimestampVectorBatch batch(1024, *getDefaultPool());
batch.numElements = 12;
batch.hasNulls = false;
batch.data[0] = 1420070400000000000;
batch.data[1] = 963270000000000000;
batch.data[2] = 1426168859000000000;
batch.data[3] = 1426168859000000001;
batch.data[4] = 1426168859000000010;
batch.data[5] = 1426168859000000100;
batch.data[6] = 1426168859000001000;
batch.data[7] = 1426168859000010000;
batch.data[8] = 1426168859000100000;
batch.data[9] = 1426168859001000000;
batch.data[10] = 1426168859010000000;
batch.data[11] = 1426168859100000000;
batch.data[0] = 1420099200;
batch.data[1] = 963298800;
batch.data[2] = 1426197659;
batch.data[3] = 1426197659;
batch.data[4] = 1426197659;
batch.data[5] = 1426197659;
batch.data[6] = 1426197659;
batch.data[7] = 1426197659;
batch.data[8] = 1426197659;
batch.data[9] = 1426197659;
batch.data[10] = 1426197659;
batch.data[11] = 1426197659;
batch.nanoseconds[0] = 0;
batch.nanoseconds[1] = 0;
batch.nanoseconds[2] = 0;
batch.nanoseconds[3] = 1;
batch.nanoseconds[4] = 10;
batch.nanoseconds[5] = 100;
batch.nanoseconds[6] = 1000;
batch.nanoseconds[7] = 10000;
batch.nanoseconds[8] = 100000;
batch.nanoseconds[9] = 1000000;
batch.nanoseconds[10] = 10000000;
batch.nanoseconds[11] = 100000000;
const char *expected[] = {"\"2015-01-01 00:00:00.0\"",
"\"2000-07-11 00:00:00.0\"",
"\"2015-03-12 15:00:59.0\"",
Expand Down
Loading