Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,12 @@ def __init__(self, name, is_signed, bit_width, nullable=True,
self.max_value = max_value

def _get_generated_data_bounds(self):
signed_iinfo = np.iinfo('int' + str(self.bit_width))
if self.is_signed:
signed_iinfo = np.iinfo('int' + str(self.bit_width))
min_value, max_value = signed_iinfo.min, signed_iinfo.max
else:
# ARROW-1837 Remove this hack and restore full unsigned integer
# range
min_value, max_value = 0, signed_iinfo.max
unsigned_iinfo = np.iinfo('uint' + str(self.bit_width))
min_value, max_value = 0, unsigned_iinfo.max

lower_bound = max(min_value, self.min_value)
upper_bound = min(max_value, self.max_value)
Expand Down Expand Up @@ -1030,8 +1029,7 @@ def _temp_path():
return

file_objs = [
(generate_primitive_case([], name='primitive_no_batches')
.skip_category('Java')),
generate_primitive_case([], name='primitive_no_batches'),
generate_primitive_case([17, 20], name='primitive'),
generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
generate_decimal_case(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ public static void convert(FileInputStream in, OutputStream out) throws IOExcept
try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
if (!reader.loadNextBatch()) {
throw new IOException("Unable to read first record batch");
}
// only writeBatches if we loaded one in the first palce.
boolean writeBatches = reader.loadNextBatch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not different from the behavior earlier - we would have thrown exception earlier for zero batches vs what it looks like proceeding silently now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentional. This was discussed on the ML, we think it is reasonable to have a schema without batches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it not be better to do that in a different method? I am not sure if existing clients depend on this for validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. Given the fact that there are no unit tests for this code path makes me think this this is only used in integration tests. Are you familiar with any additional use cases? Also this change seems like the correct behavior (I don't see why no batches is an exception) but I suppose it would be better to validate at the file level that there wasn't any corruption (but I don't think there are provisions for this in the spec)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure of exact use cases, but have been dealing with a lot of breaking changes while upgrading dremio to use latest arrow and am wary of anything that looks like change in behavior :)

Agree the new behavior seems like the right one to implement. Sounds good for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood at some point we should discuss on the ML about API/behavior stability

try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
writer.start();
while (true) {
while (writeBatches) {
writer.writeBatch();
if (!reader.loadNextBatch()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ public static void convert(InputStream in, OutputStream out) throws IOException
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
if (!reader.loadNextBatch()) {
throw new IOException("Unable to read first record batch");
}
// load the first batch before instantiating the writer so that we have any dictionaries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

// Only writeBatches if we load the first one.
boolean writeBatches = reader.loadNextBatch();
try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
writer.start();
while (true) {
while (writeBatches) {
writer.writeBatch();
if (!reader.loadNextBatch()) {
break;
Expand Down
33 changes: 33 additions & 0 deletions java/vector/src/main/java/org/apache/arrow/vector/UInt1Vector.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;

/**
* UInt1Vector implements a fixed width (1 bytes) vector of
* integer values which could be null. A validity buffer (bit vector) is
Expand Down Expand Up @@ -62,6 +64,23 @@ public MinorType getMinorType() {
| vector value retrieval methods |
| |
*----------------------------------------------------------------*/
/**
* Given a data buffer, get the value stored at a particular position
* in the vector.
*
* <p>To avoid overflow, the returned type is one step up from the signed
* type.
*
* <p>This method is mainly meant for integration tests.
*
* @param buffer data buffer
* @param index position of the element.
* @return value stored at the index.
*/
public static short getNoOverflow(final ArrowBuf buffer, final int index) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why public if this should not be used externally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was copy and paste the real intent was only to use for integration tests. I've updated it.

byte b = buffer.getByte(index * TYPE_WIDTH);
return (short)(0xFF & b);
}


/**
Expand Down Expand Up @@ -107,6 +126,20 @@ public Byte getObject(int index) {
}
}

/**
* Returns the value stored at index without the potential for overflow.
*
* @param index position of element
* @return element at given index
*/
public Short getObjectNoOverflow(int index) {
if (isSet(index) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there some new pattern about these where we optionally check isSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is for the primitive types. getObject is supposed to return null if the bitmap isn't set (and wasn't changed with the PR you are referring to).

return null;
} else {
return getNoOverflow(valueBuffer, index);
}
}

/**
* Copies the value at fromIndex to thisIndex (including validity).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;

/**
* UInt2Vector implements a fixed width (2 bytes) vector of
* integer values which could be null. A validity buffer (bit vector) is
Expand Down Expand Up @@ -62,7 +64,19 @@ public MinorType getMinorType() {
| vector value retrieval methods |
| |
*----------------------------------------------------------------*/

/**
* Given a data buffer, get the value stored at a particular position
* in the vector.
*
* <p>This method is mainly meant for integration tests.
*
* @param buffer data buffer
* @param index position of the element.
* @return value stored at the index.
*/
public static char get(final ArrowBuf buffer, final int index) {
return buffer.getChar(index * TYPE_WIDTH);
}

/**
* Get the element at the given index from the vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;

/**
* UInt4Vector implements a fixed width (4 bytes) vector of
* integer values which could be null. A validity buffer (bit vector) is
Expand Down Expand Up @@ -62,7 +64,23 @@ public MinorType getMinorType() {
| vector value retrieval methods |
| |
*----------------------------------------------------------------*/

/**
* Given a data buffer, get the value stored at a particular position
* in the vector.
*
* <p>To avoid overflow, the returned type is one step up from the signed
* type.
*
* <p>This method is mainly meant for integration tests.
*
* @param buffer data buffer
* @param index position of the element.
* @return value stored at the index.
*/
public static long getNoOverflow(final ArrowBuf buffer, final int index) {
long l = buffer.getInt(index * TYPE_WIDTH);
return ((long)0xFFFFFFFF) & l;
}

/**
* Get the element at the given index from the vector.
Expand Down Expand Up @@ -107,6 +125,20 @@ public Integer getObject(int index) {
}
}

/**
* Same as {@link #get(int)}.
*
* @param index position of element
* @return element at given index
*/
public Long getObjectNoOverflow(int index) {
if (isSet(index) == 0) {
return null;
} else {
return getNoOverflow(valueBuffer, index);
}
}

/**
* Copies a value and validity setting to the thisIndex position from the given vector
* at fromIndex.
Expand Down
37 changes: 37 additions & 0 deletions java/vector/src/main/java/org/apache/arrow/vector/UInt8Vector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import java.math.BigInteger;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.impl.UInt8ReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
Expand All @@ -28,6 +30,8 @@
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;

/**
* UInt8Vector implements a fixed width vector (8 bytes) of
* integer values which could be null. A validity buffer (bit vector) is
Expand Down Expand Up @@ -62,6 +66,25 @@ public MinorType getMinorType() {
| vector value retrieval methods |
| |
*----------------------------------------------------------------*/
private static final BigInteger SAFE_CONVERSION_MASK = new BigInteger("ffffffffffffffff", 16);

/**
* Given a data buffer, get the value stored at a particular position
* in the vector.
*
* <p>To avoid overflow, the returned type is one step up from the signed
* type.
*
* <p>This method is mainly meant for integration tests.
*
* @param buffer data buffer
* @param index position of the element.
* @return value stored at the index.
*/
public static BigInteger getNoOverflow(final ArrowBuf buffer, final int index) {
BigInteger l = BigInteger.valueOf(buffer.getLong(index * TYPE_WIDTH));
return SAFE_CONVERSION_MASK.and(l);
}


/**
Expand Down Expand Up @@ -107,6 +130,20 @@ public Long getObject(int index) {
}
}

/**
* Returns the value stored at index without the potential for overflow.
*
* @param index position of element
* @return element at given index
*/
public BigInteger getObjectNoOverflow(int index) {
if (isSet(index) == 0) {
return null;
} else {
return getNoOverflow(valueBuffer, index);
}
}

/**
* Copy a value and validity setting from fromIndex in <code>from</code> to this
* Vector at thisIndex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -324,6 +325,67 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException
}
};

BufferReader UINT1 = new BufferReader() {
@Override
protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
final int size = count * TinyIntVector.TYPE_WIDTH;
ArrowBuf buf = allocator.buffer(size);

for (int i = 0; i < count; i++) {
parser.nextToken();
buf.writeByte(parser.getShortValue() & 0xFF);
}

return buf;
}
};

BufferReader UINT2 = new BufferReader() {
@Override
protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
final int size = count * SmallIntVector.TYPE_WIDTH;
ArrowBuf buf = allocator.buffer(size);

for (int i = 0; i < count; i++) {
parser.nextToken();
buf.writeShort(parser.getIntValue() & 0xFFFF);
}

return buf;
}
};

BufferReader UINT4 = new BufferReader() {
@Override
protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
final int size = count * IntVector.TYPE_WIDTH;
ArrowBuf buf = allocator.buffer(size);

for (int i = 0; i < count; i++) {
parser.nextToken();
buf.writeInt((int)parser.getLongValue());
}

return buf;
}
};

BufferReader UINT8 = new BufferReader() {
@Override
protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
final int size = count * BigIntVector.TYPE_WIDTH;
ArrowBuf buf = allocator.buffer(size);

for (int i = 0; i < count; i++) {
parser.nextToken();
BigInteger value = parser.getBigIntegerValue();
buf.writeLong(value.longValue());
}

return buf;
}
};

BufferReader FLOAT4 = new BufferReader() {
@Override
protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
Expand Down Expand Up @@ -470,16 +532,16 @@ private ArrowBuf readIntoBuffer(BufferAllocator allocator, BufferType bufferType
reader = helper.INT8;
break;
case UINT1:
reader = helper.INT1;
reader = helper.UINT1;
break;
case UINT2:
reader = helper.INT2;
reader = helper.UINT2;
break;
case UINT4:
reader = helper.INT4;
reader = helper.UINT4;
break;
case UINT8:
reader = helper.INT8;
reader = helper.UINT8;
break;
case FLOAT4:
reader = helper.FLOAT4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
Expand Down Expand Up @@ -271,6 +275,18 @@ private void writeValueToGenerator(
case BIGINT:
generator.writeNumber(BigIntVector.get(buffer, index));
break;
case UINT1:
generator.writeNumber(UInt1Vector.getNoOverflow(buffer, index));
break;
case UINT2:
generator.writeNumber(UInt2Vector.get(buffer, index));
break;
case UINT4:
generator.writeNumber(UInt4Vector.getNoOverflow(buffer, index));
break;
case UINT8:
generator.writeNumber(UInt8Vector.getNoOverflow(buffer, index));
break;
case FLOAT4:
generator.writeNumber(Float4Vector.get(buffer, index));
break;
Expand Down
Loading