Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,6 @@ def _temp_path():

generate_duplicate_fieldnames_case()
.skip_category('Go')
.skip_category('Java')
.skip_category('JS')
.skip_category('Rust'),

Expand Down
3 changes: 2 additions & 1 deletion dev/archery/archery/integration/tester_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class JavaTester(Tester):
FLIGHT_SERVER = True
FLIGHT_CLIENT = True

JAVA_OPTS = ['-Dio.netty.tryReflectionSetAccessible=true']
JAVA_OPTS = ['-Dio.netty.tryReflectionSetAccessible=true',
'-Darrow.struct.conflict.policy=CONFLICT_APPEND']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken - this means the "correct" or "compatible" behavior is opt-in via a JVM flag? Are these flags clearly documented somewhere, I think we have a few others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lidavidm, yes the 'compatible' behaviour is opt-in. This may be controversial but through a mix of opinionated, ignorant and lazy I don't understand the value of duplicate names in a struct. Given the scope of implementing such a change fully and the unintended consequences downstream I have opted to give the library user the option to be compatible with the c++ IPC or maintain backwards compatibility with Java. I am happy to hear what the community thinks, especially if this approach is seen as too heavy handed.

This flag wasn't document anywhere so I have added a note to the 'Java Properties' section in the Java README.md.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as it's easily controllable in code (so a single application can work with both behaviors) and well-documented, that should be OK. I dislike only having global flags, but that's not the case here. (And having global flags can be useful to tweak the behavior of an application that's otherwise agnostic to the default behavior.)


_arrow_version = load_version_from_pom()
ARROW_TOOLS_JAR = os.environ.get(
Expand Down
6 changes: 5 additions & 1 deletion java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ variable are set, the system property takes precedence.

## Java Properties

For java 9 or later, should set "-Dio.netty.tryReflectionSetAccessible=true".
* For java 9 or later, should set "-Dio.netty.tryReflectionSetAccessible=true".
This fixes `java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available`. thrown by netty.
* To support duplicate fields in a `StructVector` enable "-Darrow.struct.conflict.policy=CONFLICT_APPEND".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've been using the policy that java properties should also be settable via environment variable (I believe in some context environment variables are easier to set).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Duplicate fields are ignored (`CONFLICT_REPLACE`) by default and overwritten. To support different policies for
conflicting or duplicate fields set this JVM flag or use the correct static constructor methods for `StructVector`s.

## Java Code Style Guide

Arrow Java follows the Google style guide [here][3] with the following
Expand Down
11 changes: 9 additions & 2 deletions java/vector/src/main/codegen/templates/DenseUnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.AbstractStructVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -132,8 +134,13 @@ public DenseUnionVector(String name, BufferAllocator allocator, FieldType fieldT
this.name = name;
this.allocator = allocator;
this.fieldType = fieldType;
this.internalStruct = new NonNullableStructVector("internal", allocator, INTERNAL_STRUCT_TYPE,
callBack);
this.internalStruct = new NonNullableStructVector(
"internal",
allocator,
INTERNAL_STRUCT_TYPE,
callBack,
AbstractStructVector.ConflictPolicy.CONFLICT_REPLACE,
false);
this.validityBuffer = allocator.getEmpty();
this.validityBufferAllocationSizeInBytes =
DataSizeRoundingUtil.divideBy8Ceil(BaseValueVector.INITIAL_VALUE_ALLOCATION);
Expand Down
11 changes: 9 additions & 2 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.AbstractStructVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.types.UnionMode;
Expand Down Expand Up @@ -117,8 +119,13 @@ public UnionVector(String name, BufferAllocator allocator, FieldType fieldType,
this.name = name;
this.allocator = allocator;
this.fieldType = fieldType;
this.internalStruct = new NonNullableStructVector("internal", allocator, INTERNAL_STRUCT_TYPE,
callBack);
this.internalStruct = new NonNullableStructVector(
"internal",
allocator,
INTERNAL_STRUCT_TYPE,
callBack,
AbstractStructVector.ConflictPolicy.CONFLICT_REPLACE,
false);
this.typeBuffer = allocator.getEmpty();
this.callBack = callBack;
this.typeBufferAllocationSizeInBytes = BaseValueVector.INITIAL_VALUE_ALLOCATION * TYPE_WIDTH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class VectorSchemaRoot implements AutoCloseable {
private Schema schema;
private int rowCount;
private final List<FieldVector> fieldVectors;
private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
private final Map<Field, FieldVector> fieldVectorsMap = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason no to change this to a Map<String, List>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to lose the type information. This still allows you to address Field("x", int) compared to Field("x", long).



/**
Expand Down Expand Up @@ -113,7 +113,7 @@ public VectorSchemaRoot(Schema schema, List<FieldVector> fieldVectors, int rowCo
for (int i = 0; i < schema.getFields().size(); ++i) {
Field field = schema.getFields().get(i);
FieldVector vector = fieldVectors.get(i);
fieldVectorsMap.put(field.getName(), vector);
fieldVectorsMap.put(field, vector);
}
}

Expand Down Expand Up @@ -163,8 +163,22 @@ public List<FieldVector> getFieldVectors() {
return fieldVectors.stream().collect(Collectors.toList());
}

/**
* gets a vector by name.
*
* if name occurs multiple times this returns the first inserted entry for name
*/
public FieldVector getVector(String name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does change the complexity - though presumably it's not an issue unless someone has tens of thousands of columns or is repeatedly fetching vectors in a tight loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this now matches with the Schema implementation. I think we should either:
i) deprecate the name based methods
ii) document that these methods are unsuitable for tight loops.
Any thoughts on which is more sensible?

return fieldVectorsMap.get(name);
for (Map.Entry<Field, FieldVector> entry: fieldVectorsMap.entrySet()) {
if (entry.getKey().getName().equals(name)) {
return entry.getValue();
}
}
return null;
}

public FieldVector getVector(Field field) {
return fieldVectorsMap.get(field);
}

public FieldVector getVector(int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,78 @@
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.MapWithOrdinal;
import org.apache.arrow.vector.util.PromotableMultiMapWithOrdinal;
import org.apache.arrow.vector.util.ValueVectorUtility;

/**
* Base class for StructVectors. Currently used by NonNullableStructVector
*/
public abstract class AbstractStructVector extends AbstractContainerVector {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);

private static final String STRUCT_CONFLICT_POLICY_ENV = "ARROW_STRUCT_CONFLICT_POLICY";
private static final String STRUCT_CONFLICT_POLICY_JVM = "arrow.struct.conflict.policy";
private static final ConflictPolicy DEFAULT_CONFLICT_POLICY;
// Maintains a map with key as field name and value is the vector itself
private final MapWithOrdinal<String, FieldVector> vectors = new MapWithOrdinal<>();
private final PromotableMultiMapWithOrdinal<String, FieldVector> vectors;
protected final boolean allowConflictPolicyChanges;
private ConflictPolicy conflictPolicy;


static {
String conflictPolicyStr = System.getProperty(STRUCT_CONFLICT_POLICY_JVM,
ConflictPolicy.CONFLICT_REPLACE.toString());
if (conflictPolicyStr == null) {
conflictPolicyStr = System.getenv(STRUCT_CONFLICT_POLICY_ENV);
}
ConflictPolicy conflictPolicy;
try {
conflictPolicy = ConflictPolicy.valueOf(conflictPolicyStr.toUpperCase());
} catch (Exception e) {
conflictPolicy = ConflictPolicy.CONFLICT_REPLACE;
}
DEFAULT_CONFLICT_POLICY = conflictPolicy;
}

protected AbstractStructVector(String name, BufferAllocator allocator, CallBack callBack) {
/**
* Policy to determine how to react when duplicate columns are encountered.
*/
public enum ConflictPolicy {
// Ignore the conflict and append the field. This is the default behaviour
CONFLICT_APPEND,
// Keep the existing field and ignore the newer one.
CONFLICT_IGNORE,
// Replace the existing field with the newer one.
CONFLICT_REPLACE,
// Refuse the new field and error out.
CONFLICT_ERROR
}

/**
* Base coonstructor that sets default conflict policy to APPEND.
*/
protected AbstractStructVector(String name,
BufferAllocator allocator,
CallBack callBack,
ConflictPolicy conflictPolicy,
boolean allowConflictPolicyChanges) {
super(name, allocator, callBack);
this.conflictPolicy = conflictPolicy == null ? DEFAULT_CONFLICT_POLICY : conflictPolicy;
this.vectors = new PromotableMultiMapWithOrdinal<>(allowConflictPolicyChanges, this.conflictPolicy);
this.allowConflictPolicyChanges = allowConflictPolicyChanges;
}

/**
* Set conflict policy and return last conflict policy state.
*/
public ConflictPolicy setConflictPolicy(ConflictPolicy conflictPolicy) {
ConflictPolicy tmp = this.conflictPolicy;
this.conflictPolicy = conflictPolicy;
this.vectors.setConflictPolicy(conflictPolicy);
return tmp;
}

public ConflictPolicy getConflictPolicy() {
return conflictPolicy;
}

@Override
Expand Down Expand Up @@ -114,7 +172,6 @@ public void reAlloc() {
* @return resultant {@link org.apache.arrow.vector.ValueVector}
* @throws java.lang.IllegalStateException raised if there is a hard schema change
*/
@Override
public <T extends FieldVector> T addOrGet(String childName, FieldType fieldType, Class<T> clazz) {
final ValueVector existing = getChild(childName);
boolean create = false;
Expand Down Expand Up @@ -157,25 +214,22 @@ public ValueVector getChildByOrdinal(int id) {
* Returns a {@link org.apache.arrow.vector.ValueVector} instance of subtype of T corresponding to the given
* field name if exists or null.
*
* If there is more than one element for name this will return the first inserted.
*
* @param name the name of the child to return
* @param clazz the expected type of the child
* @return the child corresponding to this name
*/
@Override
public <T extends FieldVector> T getChild(String name, Class<T> clazz) {
final ValueVector v = vectors.get(name);
if (v == null) {
final FieldVector f = vectors.get(name);
if (f == null) {
return null;
}
return typeify(v, clazz);
return typeify(f, clazz);
}

protected ValueVector add(String childName, FieldType fieldType) {
final ValueVector existing = getChild(childName);
if (existing != null) {
throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ",
existing.getClass().getSimpleName(), fieldType));
}
FieldVector vector = fieldType.createNewSingleVector(childName, allocator, callBack);
putChild(childName, vector);
if (callBack != null) {
Expand All @@ -196,21 +250,55 @@ protected void putChild(String name, FieldVector vector) {
putVector(name, vector);
}

private void put(String name, FieldVector vector, boolean overwrite) {
final boolean old = vectors.put(
Preconditions.checkNotNull(name, "field name cannot be null"),
Preconditions.checkNotNull(vector, "vector cannot be null"),
overwrite
);
if (old) {
logger.debug("Field [{}] mutated to [{}] ", name,
vector.getClass().getSimpleName());
}
}

/**
* Inserts the input vector into the map if it does not exist, replaces if it exists already.
* Inserts the input vector into the map if it does not exist.
*
* <p>
* If the field name already exists the conflict is handled according to the currently set ConflictPolicy
* </p>
*
* @param name field name
* @param vector vector to be inserted
*/
protected void putVector(String name, FieldVector vector) {
final ValueVector old = vectors.put(
Preconditions.checkNotNull(name, "field name cannot be null"),
Preconditions.checkNotNull(vector, "vector cannot be null")
);
if (old != null && old != vector) {
logger.debug("Field [{}] mutated from [{}] to [{}]", name, old.getClass().getSimpleName(),
vector.getClass().getSimpleName());
switch (conflictPolicy) {
case CONFLICT_APPEND:
put(name, vector, false);
break;
case CONFLICT_IGNORE:
if (!vectors.containsKey(name)) {
put(name, vector, false);
}
break;
case CONFLICT_REPLACE:
if (vectors.containsKey(name)) {
vectors.removeAll(name);
}
put(name, vector, true);
break;
case CONFLICT_ERROR:
if (vectors.containsKey(name)) {
throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ",
vector.getClass().getSimpleName(), vector.getField().getFieldType()));
}
put(name, vector, false);
break;
default:
throw new IllegalStateException(String.format("%s type not a valid conflict state", conflictPolicy));
}

}

/**
Expand Down Expand Up @@ -284,7 +372,7 @@ private List<ValueVector> getPrimitiveVectors(FieldVector v) {
}

/**
* Get a child vector by name.
* Get a child vector by name. If duplicate names this returns the first inserted.
* @param name the name of the child to return
* @return a vector with its corresponding ordinal mapping if field exists or null.
*/
Expand Down Expand Up @@ -333,4 +421,5 @@ public int getBufferSize() {
public String toString() {
return ValueVectorUtility.getToString(this, 0 , getValueCount());
}

}
Loading