diff --git a/cpp/src/arrow/util/bitmap_ops.cc b/cpp/src/arrow/util/bitmap_ops.cc index 0c3493040ad..adc95cba8df 100644 --- a/cpp/src/arrow/util/bitmap_ops.cc +++ b/cpp/src/arrow/util/bitmap_ops.cc @@ -70,6 +70,218 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) { return count; } +namespace { + +template +class BitmapWordReader { + public: + BitmapWordReader(const uint8_t* bitmap, int64_t offset, int64_t length) { + bitmap_ = bitmap + offset / 8; + offset_ = offset % 8; + bitmap_end_ = bitmap_ + BitUtil::BytesForBits(offset_ + length); + + // decrement word count by one as we may touch two adjacent words in one iteration + nwords_ = length / (sizeof(Word) * 8) - 1; + if (nwords_ < 0) { + nwords_ = 0; + } + trailing_bits_ = static_cast(length - nwords_ * sizeof(Word) * 8); + trailing_bytes_ = static_cast(BitUtil::BytesForBits(trailing_bits_)); + + if (nwords_ > 0) { + current_word_ = load(bitmap_); + } else if (length > 0) { + current_byte_ = load(bitmap_); + } + } + + Word NextWord() { + bitmap_ += sizeof(Word); + const Word next_word = load(bitmap_); + Word word = current_word_; + if (offset_) { + // combine two adjacent words into one word + // |<------ next ----->|<---- current ---->| + // +-------------+-----+-------------+-----+ + // | --- | A | B | --- | + // +-------------+-----+-------------+-----+ + // | | offset + // v v + // +-----+-------------+ + // | A | B | + // +-----+-------------+ + // |<------ word ----->| + word >>= offset_; + word |= next_word << (sizeof(Word) * 8 - offset_); + } + current_word_ = next_word; + return word; + } + + uint8_t NextTrailingByte(int& valid_bits) { + uint8_t byte; + DCHECK_GT(trailing_bits_, 0); + + if (trailing_bits_ <= 8) { + // last byte + valid_bits = trailing_bits_; + trailing_bits_ = 0; + byte = 0; + internal::BitmapReader reader(bitmap_, offset_, valid_bits); + for (int i = 0; i < valid_bits; ++i) { + byte >>= 1; + if (reader.IsSet()) { + byte |= 0x80; + } + reader.Next(); + } + byte >>= (8 - valid_bits); + } else { + ++bitmap_; + const uint8_t next_byte = load(bitmap_); + byte = current_byte_; + if (offset_) { + byte >>= offset_; + byte |= next_byte << (8 - offset_); + } + current_byte_ = next_byte; + trailing_bits_ -= 8; + valid_bits = 8; + } + return byte; + } + + int64_t words() const { return nwords_; } + int trailing_bytes() const { return trailing_bytes_; } + + private: + int64_t offset_; + const uint8_t* bitmap_; + + const uint8_t* bitmap_end_; + int64_t nwords_; + int trailing_bits_; + int trailing_bytes_; + union { + Word current_word_; + struct { +#if ARROW_LITTLE_ENDIAN == 0 + uint8_t padding_bytes_[sizeof(Word) - 1]; +#endif + uint8_t current_byte_; + }; + }; + + template + DType load(const uint8_t* bitmap) { + DCHECK_LE(bitmap + sizeof(DType), bitmap_end_); + return BitUtil::ToLittleEndian(util::SafeLoadAs(bitmap)); + } +}; + +template +class BitmapWordWriter { + public: + BitmapWordWriter(uint8_t* bitmap, int64_t offset, int64_t length) { + bitmap_ = bitmap + offset / 8; + offset_ = offset % 8; + bitmap_end_ = bitmap_ + BitUtil::BytesForBits(offset_ + length); + mask_ = (1U << offset_) - 1; + + if (offset_) { + if (length >= static_cast(sizeof(Word) * 8)) { + current_word_ = load(bitmap_); + } else if (length > 0) { + current_byte_ = load(bitmap_); + } + } + } + + void PutNextWord(Word word) { + if (offset_) { + // split one word into two adjacent words, don't touch unused bits + // |<------ word ----->| + // +-----+-------------+ + // | A | B | + // +-----+-------------+ + // | | + // v v offset + // +-------------+-----+-------------+-----+ + // | --- | A | B | --- | + // +-------------+-----+-------------+-----+ + // |<------ next ----->|<---- current ---->| + word = (word << offset_) | (word >> (sizeof(Word) * 8 - offset_)); + Word next_word = load(bitmap_ + sizeof(Word)); + current_word_ = (current_word_ & mask_) | (word & ~mask_); + next_word = (next_word & ~mask_) | (word & mask_); + store(bitmap_, current_word_); + store(bitmap_ + sizeof(Word), next_word); + current_word_ = next_word; + } else { + store(bitmap_, word); + } + bitmap_ += sizeof(Word); + } + + void PutNextTrailingByte(uint8_t byte, int valid_bits) { + if (valid_bits == 8) { + if (offset_) { + byte = (byte << offset_) | (byte >> (8 - offset_)); + uint8_t next_byte = load(bitmap_ + 1); + current_byte_ = (current_byte_ & mask_) | (byte & ~mask_); + next_byte = (next_byte & ~mask_) | (byte & mask_); + store(bitmap_, current_byte_); + store(bitmap_ + 1, next_byte); + current_byte_ = next_byte; + } else { + store(bitmap_, byte); + } + ++bitmap_; + } else { + DCHECK_GT(valid_bits, 0); + DCHECK_LT(valid_bits, 8); + DCHECK_LE(bitmap_ + BitUtil::BytesForBits(offset_ + valid_bits), bitmap_end_); + internal::BitmapWriter writer(bitmap_, offset_, valid_bits); + for (int i = 0; i < valid_bits; ++i) { + (byte & 0x01) ? writer.Set() : writer.Clear(); + writer.Next(); + byte >>= 1; + } + writer.Finish(); + } + } + + private: + int64_t offset_; + uint8_t* bitmap_; + + const uint8_t* bitmap_end_; + uint64_t mask_; + union { + Word current_word_; + struct { +#if ARROW_LITTLE_ENDIAN == 0 + uint8_t padding_bytes_[sizeof(Word) - 1]; +#endif + uint8_t current_byte_; + }; + }; + + template + DType load(const uint8_t* bitmap) { + DCHECK_LE(bitmap + sizeof(DType), bitmap_end_); + return BitUtil::ToLittleEndian(util::SafeLoadAs(bitmap)); + } + + template + void store(uint8_t* bitmap, DType data) { + DCHECK_LE(bitmap + sizeof(DType), bitmap_end_); + util::SafeStore(bitmap, BitUtil::FromLittleEndian(data)); + } +}; + +} // namespace + template void TransferBitmap(const uint8_t* data, int64_t offset, int64_t length, int64_t dest_offset, uint8_t* dest) { @@ -78,69 +290,26 @@ void TransferBitmap(const uint8_t* data, int64_t offset, int64_t length, int64_t dest_byte_offset = dest_offset / 8; int64_t dest_bit_offset = dest_offset % 8; int64_t num_bytes = BitUtil::BytesForBits(length); - // Shift dest by its byte offset - dest += dest_byte_offset; if (bit_offset || dest_bit_offset) { - data += byte_offset; - - const int64_t n_words = length / 64; - if (n_words > 1) { - auto load_word = [](const uint8_t* bytes) -> uint64_t { - return BitUtil::ToLittleEndian(util::SafeLoadAs(bytes)); - }; - auto shift_word = [](uint64_t current, uint64_t next, int64_t shift) -> uint64_t { - if (shift == 0) return current; - return (current >> shift) | (next << (64 - shift)); - }; - auto write_word = [](uint8_t* bytes, uint64_t word) { - util::SafeStore(bytes, BitUtil::FromLittleEndian(word)); - }; - - const uint64_t dest_mask = (1U << dest_bit_offset) - 1; - auto data_current = load_word(data); - auto dest_current = load_word(dest); - - for (int64_t i = 0; i < n_words - 1; ++i) { - data += 8; - const auto data_next = load_word(data); - auto word = shift_word(data_current, data_next, bit_offset); - data_current = data_next; - if (invert_bits) { - word = ~word; - } - - if (dest_bit_offset) { - word = (word << dest_bit_offset) | (word >> (64 - dest_bit_offset)); - auto dest_next = load_word(dest + 8); - dest_current = (dest_current & dest_mask) | (word & ~dest_mask); - dest_next = (dest_next & ~dest_mask) | (word & dest_mask); - write_word(dest, dest_current); - write_word(dest + 8, dest_next); - dest_current = dest_next; - } else { - write_word(dest, word); - } - dest += 8; - } + auto reader = internal::BitmapWordReader(data, offset, length); + auto writer = internal::BitmapWordWriter(dest, dest_offset, length); - length -= (n_words - 1) * 64; + auto nwords = reader.words(); + while (nwords--) { + auto word = reader.NextWord(); + writer.PutNextWord(invert_bits ? ~word : word); } - - internal::BitmapReader valid_reader(data, bit_offset, length); - internal::BitmapWriter valid_writer(dest, dest_bit_offset, length); - - for (int64_t i = 0; i < length; i++) { - if (invert_bits ^ valid_reader.IsSet()) { - valid_writer.Set(); - } else { - valid_writer.Clear(); - } - valid_reader.Next(); - valid_writer.Next(); + auto nbytes = reader.trailing_bytes(); + while (nbytes--) { + int valid_bits; + auto byte = reader.NextTrailingByte(valid_bits); + writer.PutNextTrailingByte(invert_bits ? ~byte : byte, valid_bits); } - valid_writer.Finish(); } else { + // Shift dest by its byte offset + dest += dest_byte_offset; + // Take care of the trailing bits in the last byte int64_t trailing_bits = num_bytes * 8 - length; uint8_t trail = 0; @@ -213,15 +382,15 @@ Result> InvertBitmap(MemoryPool* pool, const uint8_t* da } bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right, - int64_t right_offset, int64_t bit_length) { + int64_t right_offset, int64_t length) { if (left_offset % 8 == 0 && right_offset % 8 == 0) { // byte aligned, can use memcmp - bool bytes_equal = std::memcmp(left + left_offset / 8, right + right_offset / 8, - bit_length / 8) == 0; + bool bytes_equal = + std::memcmp(left + left_offset / 8, right + right_offset / 8, length / 8) == 0; if (!bytes_equal) { return false; } - for (int64_t i = (bit_length / 8) * 8; i < bit_length; ++i) { + for (int64_t i = (length / 8) * 8; i < length; ++i) { if (BitUtil::GetBit(left, left_offset + i) != BitUtil::GetBit(right, right_offset + i)) { return false; @@ -231,48 +400,20 @@ bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right } // Unaligned slow case - left += left_offset / 8; - right += right_offset / 8; - left_offset %= 8; - right_offset %= 8; - - // process in 64 bits, may touch two adjacent words in one iteration - const int64_t n_words = bit_length / 64; - if (n_words > 1) { - auto load_word = [](const uint8_t* bytes) -> uint64_t { - return BitUtil::ToLittleEndian(util::SafeLoadAs(bytes)); - }; - auto shift_word = [](uint64_t current, uint64_t next, int64_t shift) -> uint64_t { - if (shift == 0) return current; - return (current >> shift) | (next << (64 - shift)); - }; - - auto left_current = load_word(left); - auto right_current = load_word(right); - - for (int64_t i = 0; i < n_words - 1; ++i) { - left += 8; - auto left_next = load_word(left); - auto left_word = shift_word(left_current, left_next, left_offset); - left_current = left_next; - - right += 8; - auto right_next = load_word(right); - auto right_word = shift_word(right_current, right_next, right_offset); - right_current = right_next; + auto left_reader = internal::BitmapWordReader(left, left_offset, length); + auto right_reader = internal::BitmapWordReader(right, right_offset, length); - if (left_word != right_word) { - return false; - } + auto nwords = left_reader.words(); + while (nwords--) { + if (left_reader.NextWord() != right_reader.NextWord()) { + return false; } - - bit_length -= (n_words - 1) * 64; } - - // process in bit - for (int64_t i = 0; i < bit_length; ++i) { - if (BitUtil::GetBit(left, left_offset + i) != - BitUtil::GetBit(right, right_offset + i)) { + auto nbytes = left_reader.trailing_bytes(); + while (nbytes--) { + int valid_bits; + if (left_reader.NextTrailingByte(valid_bits) != + right_reader.NextTrailingByte(valid_bits)) { return false; } } @@ -298,116 +439,32 @@ void AlignedBitmapOp(const uint8_t* left, int64_t left_offset, const uint8_t* ri } } -template