Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private static String createErrorMsg(final BufferAllocator allocator, final int
* @param val An integer value.
* @return The closest power of two of that value.
*/
static int nextPowerOfTwo(int val) {
public static int nextPowerOfTwo(int val) {
int highestBit = Integer.highestOneBit(val);
if (highestBit == val) {
return val;
Expand All @@ -142,6 +142,21 @@ static int nextPowerOfTwo(int val) {
}
}

/**
* Rounds up the provided value to the nearest power of two.
*
* @param val A long value.
* @return The closest power of two of that value.
*/
public static long nextPowerOfTwo(long val) {
long highestBit = Long.highestOneBit(val);
if (highestBit == val) {
return val;
} else {
return highestBit << 1;
}
}

public static StringBuilder indent(StringBuilder sb, int indent) {
final char[] indentation = new char[indent * 2];
Arrays.fill(indentation, ' ');
Expand Down
13 changes: 10 additions & 3 deletions java/vector/src/main/codegen/templates/FixedValueVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,21 @@ private void allocateBytes(final long size) {
* @throws org.apache.arrow.memory.OutOfMemoryException if it can't allocate the new buffer
*/
public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes * 2L;
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is not very straight forward to me:

  • In what case allocationSizeInBytes is less than data.capacity()?
  • Why do we want to set the new allocation to data.capacity() * 2 instead of allocationSizeInBytes * 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a vector A, write data to it to an extent that you trigger 2 reallocs at least.
Now transfer the vector to vector B.
Now do something that triggers reAlloc() for vector B -- the reAlloc() will segfault because allocateSizeInBytes is still at the initial default value whereas this vector's buffer is probably 4x the size of that.
reAlloc will try to copy data from 128K sized buffer (at least) onto a 64K buffer and segfault

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...Thanks for the explanation.

What is the semantics of transfer and why doesn't it set allocateSizeInBytes for vector B in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We saw the problem while dealing with complex JSON schema -- detailed problem description is here #1097

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transfer() function is aimed at transferring the data buffer (along with ownership) of one vector to target vector of same type.

It is not clear to me if we would want to transfer more state in the function. Secondly, there could be other cases where allocationSizeInBytes < buffer capacity. So in any case we are probably better off safeguarding reAlloc() regardless.

One case could be vector reset() which resets the value of allocationSizeInBytes as well. If we reAlloc() a vector after doing reset(), I think we will run into the same problem.

Copy link
Contributor

@icexelloss icexelloss Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, didn't realize there is another PR, thanks for the context.

Edit: Didn't see the second comment which answers my question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But doesn't this change cause weird reAlloc() effect after reset() though? i.e. if we reset a 128Kb double vector to 32Kb and then reAlloc(), it would be 256Kb instead of 64Kb, and it would also have the old value (incorrect) from 32-128Kb?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset() already has a problem that I think we should fix across all vector types. reset() should actually not re-initialize allocationSizeInBytes at all because reset() is typically aimed at zeroing out the buffer, resetting mutator/accessor etc -- the underlying buffer capacity still remains the same after reset.

So for your example, 128KB buffer remains a 128Kb buffer zeroed out upon reset(). On a subsequent reAlloc(), we will go to 256KB. There won't be any incorrect or garbage bits lying around on the data buffer because the entire buffer is zeroed out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Yeah you are right. This is correct.

However, in general, I find the usage between allocationSizeInBytes and data.capacity() confusing. They seem to be the same thing but is inconsistent in various places.

For instance, it's not clear to me why don't we just double data.capacity() in reAlloc() instead of checking both allocationSizeInBytes and data.capacity().

Maybe we should have a follow up Jira to:

  • Document the difference between the two
  • Check if allocationSizeInBytes and data.capacity() are used correctly in all places

baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
}

logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", name, allocationSizeInBytes, newAllocationSize);
final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
final int halfNewCapacity = newBuf.capacity() / 2;
newBuf.setZero(halfNewCapacity, halfNewCapacity);
newBuf.writerIndex(data.writerIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,20 @@ public void reset() {
}

public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes*2L;
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be same as FixedValueVector. Maybe some refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand the comment. The same problem is fixed across reAlloc() of all vectors -- bit vector, fixed width and variable width.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant, since these logic looks similar, I am wondering if we can refactor the shared logic into the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought about it but since with ARROW-1463 the inheritance hierarchy and templates might look different, I thought may there is not much gain in refactoring now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Yeah I don't love it but if we are going to fix this in ARROW-1463 then I am ok.

baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
}

final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
data.release();
data = newBuf;
allocationSizeInBytes = (int)newAllocationSize;
Expand Down
12 changes: 10 additions & 2 deletions java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.arrow.vector;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.BitHolder;
Expand Down Expand Up @@ -208,15 +209,22 @@ private void allocateBytes(final long size) {
* Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
*/
public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes * 2L;
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
}

final int curSize = (int) newAllocationSize;
final ArrowBuf newBuf = allocator.buffer(curSize);
newBuf.setZero(0, newBuf.capacity());
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
data.release();
data = newBuf;
allocationSizeInBytes = curSize;
Expand Down
193 changes: 193 additions & 0 deletions java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -234,6 +235,198 @@ public void testSplitAndTransfer2() throws Exception {
}
}

@Test
public void testReallocAfterVectorTransfer1() {
try (final BitVector vector = new BitVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(4096);
int valueCapacity = vector.getValueCapacity();
assertEquals(4096, valueCapacity);

final BitVector.Mutator mutator = vector.getMutator();
final BitVector.Accessor accessor = vector.getAccessor();

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity; i++) {
int val = accessor.get(i);
if ((i & 1) == 1) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* trigger first realloc */
mutator.setSafeToOne(valueCapacity);
assertEquals(valueCapacity * 2, vector.getValueCapacity());

for (int i = valueCapacity; i < valueCapacity*2; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity*2; i++) {
int val = accessor.get(i);
if (((i & 1) == 1) || (i == valueCapacity)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* trigger second realloc */
mutator.setSafeToOne(valueCapacity*2);
assertEquals(valueCapacity * 4, vector.getValueCapacity());

for (int i = valueCapacity*2; i < valueCapacity*4; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity*4; i++) {
int val = accessor.get(i);
if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* now transfer the vector */
TransferPair transferPair = vector.getTransferPair(allocator);
transferPair.transfer();
final BitVector toVector = (BitVector)transferPair.getTo();
final BitVector.Accessor toAccessor = toVector.getAccessor();
final BitVector.Mutator toMutator = toVector.getMutator();

assertEquals(valueCapacity * 4, toVector.getValueCapacity());

/* realloc the toVector */
toMutator.setSafeToOne(valueCapacity * 4);

for (int i = 0; i < toVector.getValueCapacity(); i++) {
int val = toAccessor.get(i);
if (i <= valueCapacity * 4) {
if (((i & 1) == 1) || (i == valueCapacity) ||
(i == valueCapacity*2) || (i == valueCapacity*4)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

toVector.close();
}
}

@Test
public void testReallocAfterVectorTransfer2() {
try (final NullableBitVector vector = new NullableBitVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(4096);
int valueCapacity = vector.getValueCapacity();
assertEquals(4096, valueCapacity);

final NullableBitVector.Mutator mutator = vector.getMutator();
final NullableBitVector.Accessor accessor = vector.getAccessor();

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* trigger first realloc */
mutator.setSafe(valueCapacity, 1, 1);
assertEquals(valueCapacity * 2, vector.getValueCapacity());

for (int i = valueCapacity; i < valueCapacity*2; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity*2; i++) {
if (((i & 1) == 1) || (i == valueCapacity)) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* trigger second realloc */
mutator.setSafe(valueCapacity*2, 1, 1);
assertEquals(valueCapacity * 4, vector.getValueCapacity());

for (int i = valueCapacity*2; i < valueCapacity*4; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity*4; i++) {
if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* now transfer the vector */
TransferPair transferPair = vector.getTransferPair(allocator);
transferPair.transfer();
final NullableBitVector toVector = (NullableBitVector)transferPair.getTo();
final NullableBitVector.Accessor toAccessor = toVector.getAccessor();
final NullableBitVector.Mutator toMutator = toVector.getMutator();

assertEquals(valueCapacity * 4, toVector.getValueCapacity());

/* realloc the toVector */
toMutator.setSafe(valueCapacity * 4, 1, 1);

for (int i = 0; i < toVector.getValueCapacity(); i++) {
if (i <= valueCapacity * 4) {
if (((i & 1) == 1) || (i == valueCapacity) ||
(i == valueCapacity*2) || (i == valueCapacity*4)) {
assertFalse("unexpected cleared bit at index: " + i, toAccessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i));
}
}
else {
assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i));
}
}

toVector.close();
}
}

@Test
public void testBitVector() {
// Create a new value vector for 1024 integers
Expand Down
Loading