Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ public static class Builder {
@Nullable private Factory<List<FieldValueGetter>> fieldValueGetterFactory;
@Nullable private Object getterTarget;
private Schema schema;
private boolean collectionHandledByGetter = false;

Builder(Schema schema) {
this.schema = schema;
Expand Down Expand Up @@ -554,6 +555,12 @@ public Builder withFieldValueGetters(
return this;
}

/** The FieldValueGetters will handle the conversion for Arrays, Maps and Rows. */
public Builder withFieldValueGettersHandleCollections(boolean collectionHandledByGetter) {
this.collectionHandledByGetter = collectionHandledByGetter;
return this;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble understanding what this is for. Can you explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, maybe the name could be better, but it means that the the FieldValueGetters also handle collections like ARRAY, ROW, MAP. If you go to the original implementation ( https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java#L90 ) you see that RowWithGetters does naive handling of ARRAY, ROW and MAP, for protobuf you need more context (the descriptor) to handle them. That's why I need to disable the naive mapping and let the FieldValueGetters handle ARRAY, ROW and MAP.

Feel free to suggest a better name though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax is this the last remaining concern you have for this PR?

#8690 (comment) is a good reference for the motivation of this as well.

I'm not crazy about the way this is implemented since it's adding state to RowWithGetter that will get checked every time a collection field is accessed. I can't think of a better way to do it without some non-trivial refactoring though. Some ideas:

  1. Rather than storing collectionHandledByGetter in RowWithGetters and changing behavior based on it, have two alternate RowWithGetters implementations, one with the special handling for collections and one without. I think this is still an improvement over the separate ProtoRow class that was rejected since it's not explicitly tied to a particular SchemaProvider, in fact it sounds like it could be re-used for Avro GenericRecord instances.
  2. Move the special logic for collection getters into those SchemaProvider/FieldValueGetter implementations that need it, and make RowWithGetters always behave as if collectionHandledByGetter is true. I think this could be a cleaner approach? But it's challenging because that special logic stores cached values that wouldn't be appropriate to move into the FieldValueGetter implementations. Maybe a SchemaProvider could have some way to indicate that a certain row is expensive to access and should be cached in RowWithGetters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored from your input: I've created a RowWithGettersCachedCollection that inherits for RowWithGetters. This cached is the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand this a bit more? Why does it not work to cache lists for protocol buffers? We saw repeated array conversion to be a big problem (which is why we cache them). I'm wondering if we could instead cache a lazy array like we do with iterables.

I'll take a closer look at this code to figure it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we make a ticket out of this or is this blocking?


private List<Object> verify(Schema schema, List<Object> values) {
List<Object> verifiedValues = Lists.newArrayListWithCapacity(values.size());
if (schema.getFieldCount() != values.size()) {
Expand Down Expand Up @@ -754,7 +761,11 @@ public Row build() {
return new RowWithStorage(schema, storageValues);
} else if (fieldValueGetterFactory != null) {
checkState(getterTarget != null, "getters require withGetterTarget.");
return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget);
if (collectionHandledByGetter) {
return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget);
} else {
return new RowWithGettersCachedCollections(schema, fieldValueGetterFactory, getterTarget);
}
} else {
return new RowWithStorage(schema, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.values;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -27,9 +26,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

/**
* A Concrete subclass of {@link Row} that delegates to a set of provided {@link FieldValueGetter}s.
Expand All @@ -39,12 +35,9 @@
* the appropriate fields from the POJO.
*/
public class RowWithGetters extends Row {
private final Factory<List<FieldValueGetter>> fieldValueGetterFactory;
private final Object getterTarget;
private final List<FieldValueGetter> getters;

private final Map<Integer, List> cachedLists = Maps.newHashMap();
private final Map<Integer, Map> cachedMaps = Maps.newHashMap();
final Factory<List<FieldValueGetter>> fieldValueGetterFactory;
final Object getterTarget;
final List<FieldValueGetter> getters;

RowWithGetters(
Schema schema, Factory<List<FieldValueGetter>> getterFactory, Object getterTarget) {
Expand All @@ -67,44 +60,9 @@ public <T> T getValue(int fieldIdx) {
return fieldValue != null ? getValue(type, fieldValue, fieldIdx) : null;
}

private List getListValue(FieldType elementType, Object fieldValue) {
Iterable iterable = (Iterable) fieldValue;
List<Object> list = Lists.newArrayList();
for (Object o : iterable) {
list.add(getValue(elementType, o, null));
}
return list;
}

private Map<?, ?> getMapValue(FieldType keyType, FieldType valueType, Map<?, ?> fieldValue) {
Map returnMap = Maps.newHashMap();
for (Map.Entry<?, ?> entry : fieldValue.entrySet()) {
returnMap.put(
getValue(keyType, entry.getKey(), null), getValue(valueType, entry.getValue(), null));
}
return returnMap;
}

@SuppressWarnings({"TypeParameterUnusedInFormals", "unchecked"})
private <T> T getValue(FieldType type, Object fieldValue, @Nullable Integer cacheKey) {
if (type.getTypeName().equals(TypeName.ROW)) {
return (T) new RowWithGetters(type.getRowSchema(), fieldValueGetterFactory, fieldValue);
} else if (type.getTypeName().equals(TypeName.ARRAY)) {
return cacheKey != null
? (T)
cachedLists.computeIfAbsent(
cacheKey, i -> getListValue(type.getCollectionElementType(), fieldValue))
: (T) getListValue(type.getCollectionElementType(), fieldValue);
} else if (type.getTypeName().equals(TypeName.MAP)) {
Map map = (Map) fieldValue;
return cacheKey != null
? (T)
cachedMaps.computeIfAbsent(
cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
: (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
} else {
return (T) fieldValue;
}
protected <T> T getValue(FieldType type, Object fieldValue, @Nullable Integer cacheKey) {
return (T) fieldValue;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Factory;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

/**
* A Concrete subclass of {@link Row} that delegates to a set of provided {@link FieldValueGetter}s.
* This is a special version of {@link RowWithGetters} that cached the map and list collection.
*
* <p>This allows us to have {@link Row} objects for which the actual storage is in another object.
* For example, the user's type may be a POJO, in which case the provided getters will simple read
* the appropriate fields from the POJO.
*/
public class RowWithGettersCachedCollections extends RowWithGetters {
private final Map<Integer, List> cachedLists = Maps.newHashMap();
private final Map<Integer, Map> cachedMaps = Maps.newHashMap();

RowWithGettersCachedCollections(
Schema schema, Factory<List<FieldValueGetter>> getterFactory, Object getterTarget) {
super(schema, getterFactory, getterTarget);
}

private List getListValue(FieldType elementType, Object fieldValue) {
Iterable iterable = (Iterable) fieldValue;
List<Object> list = Lists.newArrayList();
for (Object o : iterable) {
list.add(getValue(elementType, o, null));
}
return list;
}

private Map<?, ?> getMapValue(FieldType keyType, FieldType valueType, Map<?, ?> fieldValue) {
Map returnMap = Maps.newHashMap();
for (Map.Entry<?, ?> entry : fieldValue.entrySet()) {
returnMap.put(
getValue(keyType, entry.getKey(), null), getValue(valueType, entry.getValue(), null));
}
return returnMap;
}

@SuppressWarnings({"TypeParameterUnusedInFormals", "unchecked"})
@Override
protected <T> T getValue(FieldType type, Object fieldValue, @Nullable Integer cacheKey) {
if (type.getTypeName().equals(TypeName.ROW)) {
return (T)
new RowWithGettersCachedCollections(
type.getRowSchema(), fieldValueGetterFactory, fieldValue);
} else if (type.getTypeName().equals(TypeName.ARRAY)) {
return cacheKey != null
? (T)
cachedLists.computeIfAbsent(
cacheKey, i -> getListValue(type.getCollectionElementType(), fieldValue))
: (T) getListValue(type.getCollectionElementType(), fieldValue);
} else if (type.getTypeName().equals(TypeName.MAP)) {
Map map = (Map) fieldValue;
return cacheKey != null
? (T)
cachedMaps.computeIfAbsent(
cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
: (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
} else {
return (T) fieldValue;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (o instanceof RowWithGettersCachedCollections) {
RowWithGettersCachedCollections other = (RowWithGettersCachedCollections) o;
return Objects.equals(getSchema(), other.getSchema())
&& Objects.equals(getterTarget, other.getterTarget);
} else if (o instanceof Row) {
return super.equals(o);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(getSchema(), getterTarget);
}
}
Loading