From fcdc0c33cc5d847bb530a2b7715a9bc98a7b990a Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Mon, 11 Apr 2016 09:08:23 -0700 Subject: [PATCH 01/12] Add DisplayData for combine transforms --- .../sdk/transforms/ApproximateQuantiles.java | 8 + .../sdk/transforms/ApproximateUnique.java | 40 +++++ .../apache/beam/sdk/transforms/Combine.java | 154 ++++++++++++++---- .../beam/sdk/transforms/CombineFnBase.java | 27 ++- .../beam/sdk/transforms/CombineFns.java | 30 ++++ .../sdk/transforms/CombineWithContext.java | 6 + .../org/apache/beam/sdk/transforms/Max.java | 6 + .../org/apache/beam/sdk/transforms/Min.java | 6 + .../apache/beam/sdk/transforms/Sample.java | 14 ++ .../org/apache/beam/sdk/transforms/Top.java | 8 + .../sdk/transforms/display/DisplayData.java | 86 +++++++++- .../sdk/transforms/display/JavaClass.java | 94 +++++++++++ .../apache/beam/sdk/util/CombineFnUtil.java | 13 ++ .../transforms/ApproximateQuantilesTest.java | 13 ++ .../sdk/transforms/ApproximateUniqueTest.java | 17 ++ .../beam/sdk/transforms/CombineFnsTest.java | 61 ++++++- .../beam/sdk/transforms/CombineTest.java | 22 ++- .../apache/beam/sdk/transforms/MaxTest.java | 13 +- .../apache/beam/sdk/transforms/MinTest.java | 13 +- .../beam/sdk/transforms/SampleTest.java | 14 ++ .../apache/beam/sdk/transforms/TopTest.java | 13 ++ .../display/DisplayDataMatchers.java | 3 +- .../transforms/display/DisplayDataTest.java | 18 +- .../sdk/transforms/display/JavaClassTest.java | 65 ++++++++ .../display/JavaClassJava8Test.java | 45 +++++ .../beam/sdk/transforms/CombineJava8Test.java | 36 ++++ 26 files changed, 774 insertions(+), 51 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java create mode 100644 sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index 2ed7a8597008..c58c7368d36e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WeightedValue; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; @@ -359,6 +360,13 @@ public Coder> getAccumulatorCoder( CoderRegistry registry, Coder elementCoder) { return new QuantileStateCoder<>(compareFn, elementCoder); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numQuantiles", numQuantiles) + .add("comparer", compareFn.getClass()); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java index 4f9dfc4ce5fa..c7812e4d4d49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -167,6 +168,11 @@ static class Globally extends PTransform, PCollection> { */ private final long sampleSize; + /** + * The the desired maximum estimation error, or null if not specified. + */ + private final Double maximumEstimationError; + /** * @see ApproximateUnique#globally(int) */ @@ -178,7 +184,9 @@ public Globally(int sampleSize) { + "In general, the estimation " + "error is about 2 / sqrt(sampleSize)."); } + this.sampleSize = sampleSize; + this.maximumEstimationError = null; } /** @@ -190,7 +198,9 @@ public Globally(double maximumEstimationError) { "ApproximateUnique needs an " + "estimation error between 1% (0.01) and 50% (0.5)."); } + this.sampleSize = sampleSizeFromEstimationError(maximumEstimationError); + this.maximumEstimationError = maximumEstimationError; } @Override @@ -200,6 +210,11 @@ public PCollection apply(PCollection input) { Combine.globally( new ApproximateUniqueCombineFn<>(sampleSize, coder))); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError); + } } /** @@ -213,8 +228,17 @@ public PCollection apply(PCollection input) { static class PerKey extends PTransform>, PCollection>> { + /** + * The number of entries in the statistical sample; the higher this number, + * the more accurate the estimate will be. + */ private final long sampleSize; + /** + * The the desired maximum estimation error, if specified. + */ + private final Double maximumEstimationError; + /** * @see ApproximateUnique#perKey(int) */ @@ -225,7 +249,9 @@ public PerKey(int sampleSize) { + "sampleSize >= 16 for an estimation error <= 50%. In general, " + "the estimation error is about 2 / sqrt(sampleSize)."); } + this.sampleSize = sampleSize; + this.maximumEstimationError = null; } /** @@ -237,7 +263,9 @@ public PerKey(double estimationError) { "ApproximateUnique.PerKey needs an " + "estimation error between 1% (0.01) and 50% (0.5)."); } + this.sampleSize = sampleSizeFromEstimationError(estimationError); + this.maximumEstimationError = estimationError; } @Override @@ -254,6 +282,11 @@ public PCollection> apply(PCollection> input) { Combine.perKey(new ApproximateUniqueCombineFn<>( sampleSize, coder).asKeyedFn())); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError); + } } @@ -418,4 +451,11 @@ static long hash(T element, Coder coder) throws CoderException, IOExcepti static long sampleSizeFromEstimationError(double estimationError) { return Math.round(Math.ceil(4.0 / Math.pow(estimationError, 2.0))); } + + private static void populateDisplayData( + DisplayData.Builder builder, long sampleSize, Double maxEstimationError) { + builder + .add("sampleSize", sampleSize) + .addIfNotNull("maximumEstimationError", maxEstimationError); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 28bbeed285fc..1eece1787f82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; @@ -37,6 +36,9 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.display.JavaClass; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -101,9 +103,10 @@ private Combine() { */ public static Globally globally( SerializableFunction, V> combiner) { - return globally(IterableCombineFn.of(combiner)); + return globally(IterableCombineFn.of(combiner), JavaClass.fromInstance(combiner)); } + /** * Returns a {@link Globally Combine.Globally} {@code PTransform} * that uses the given {@code GloballyCombineFn} to combine all @@ -121,7 +124,12 @@ public static Globally globally( */ public static Globally globally( GlobalCombineFn fn) { - return new Globally<>(fn, true, 0); + return globally(fn, JavaClass.fromInstance(fn)); + } + + private static Globally globally( + GlobalCombineFn fn, JavaClass fnClass) { + return new Globally<>(fn, fnClass, true, 0); } /** @@ -142,7 +150,7 @@ public static Globally globally( */ public static PerKey perKey( SerializableFunction, V> fn) { - return perKey(Combine.IterableCombineFn.of(fn)); + return perKey(IterableCombineFn.of(fn).asKeyedFn(), JavaClass.fromInstance(fn)); } /** @@ -163,7 +171,7 @@ public static PerKey perKey( */ public static PerKey perKey( GlobalCombineFn fn) { - return perKey(fn.asKeyedFn()); + return perKey(fn.asKeyedFn(), JavaClass.fromInstance(fn)); } /** @@ -184,7 +192,12 @@ public static PerKey perKey( */ public static PerKey perKey( PerKeyCombineFn fn) { - return new PerKey<>(fn, false /*fewKeys*/); + return perKey(fn, JavaClass.fromInstance(fn)); + } + + private static PerKey perKey( + PerKeyCombineFn fn, JavaClass fnClass) { + return new PerKey<>(fn, fnClass, false /*fewKeys*/); } /** @@ -192,8 +205,8 @@ public static PerKey perKey( * in {@link GroupByKey}. */ private static PerKey fewKeys( - PerKeyCombineFn fn) { - return new PerKey<>(fn, true /*fewKeys*/); + PerKeyCombineFn fn, JavaClass fnClass) { + return new PerKey<>(fn, fnClass, true /*fewKeys*/); } /** @@ -219,7 +232,7 @@ private static PerKey fewKeys( */ public static GroupedValues groupedValues( SerializableFunction, V> fn) { - return groupedValues(IterableCombineFn.of(fn)); + return groupedValues(IterableCombineFn.of(fn).asKeyedFn(), JavaClass.fromInstance(fn)); } /** @@ -245,7 +258,7 @@ public static GroupedValues groupedValues( */ public static GroupedValues groupedValues( GlobalCombineFn fn) { - return groupedValues(fn.asKeyedFn()); + return groupedValues(fn.asKeyedFn(), JavaClass.fromInstance(fn)); } /** @@ -271,9 +284,13 @@ public static GroupedValues groupedValu */ public static GroupedValues groupedValues( PerKeyCombineFn fn) { - return new GroupedValues<>(fn); + return groupedValues(fn, JavaClass.fromInstance(fn)); } + private static GroupedValues groupedValues( + PerKeyCombineFn fn, JavaClass fnClass) { + return new GroupedValues<>(fn, fnClass); + } ///////////////////////////////////////////////////////////////////////////// @@ -495,6 +512,11 @@ public Coder getDefaultOutputCoder( public CombineFn forKey(K key, Coder keyCoder) { return CombineFn.this; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + CombineFn.this.populateDisplayData(builder); + } }; } } @@ -1168,6 +1190,11 @@ public Coder getDefaultOutputCoder( CoderRegistry registry, Coder inputCoder) throws CannotProvideCoderException { return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + KeyedCombineFn.this.populateDisplayData(builder); + } }; } @@ -1233,31 +1260,35 @@ public static class Globally extends PTransform, PCollection> { private final GlobalCombineFn fn; + private final JavaClass fnClass; private final boolean insertDefault; private final int fanout; private final List> sideInputs; - private Globally(GlobalCombineFn fn, + private Globally(GlobalCombineFn fn, JavaClass fnClass, boolean insertDefault, int fanout) { this.fn = fn; + this.fnClass = fnClass; this.insertDefault = insertDefault; this.fanout = fanout; this.sideInputs = ImmutableList.>of(); } private Globally(String name, GlobalCombineFn fn, - boolean insertDefault, int fanout) { + JavaClass fnClass, boolean insertDefault, int fanout) { super(name); this.fn = fn; + this.fnClass = fnClass; this.insertDefault = insertDefault; this.fanout = fanout; this.sideInputs = ImmutableList.>of(); } private Globally(String name, GlobalCombineFn fn, - boolean insertDefault, int fanout, List> sideInputs) { + JavaClass fnClass, boolean insertDefault, int fanout, List> sideInputs) { super(name); this.fn = fn; + this.fnClass = fnClass; this.insertDefault = insertDefault; this.fanout = fanout; this.sideInputs = sideInputs; @@ -1268,7 +1299,7 @@ private Globally(String name, GlobalCombineFn fn, * specified name. Does not modify this transform. */ public Globally named(String name) { - return new Globally<>(name, fn, insertDefault, fanout); + return new Globally<>(name, fn, fnClass, insertDefault, fanout); } /** @@ -1279,7 +1310,7 @@ public Globally named(String name) { * to an empty input set will be returned. */ public GloballyAsSingletonView asSingletonView() { - return new GloballyAsSingletonView<>(fn, insertDefault, fanout); + return new GloballyAsSingletonView<>(fn, fnClass, insertDefault, fanout); } /** @@ -1288,7 +1319,7 @@ public GloballyAsSingletonView asSingletonView() { * is not globally windowed and the output is not being used as a side input. */ public Globally withoutDefaults() { - return new Globally<>(name, fn, false, fanout); + return new Globally<>(name, fn, fnClass, false, fanout); } /** @@ -1299,7 +1330,7 @@ public Globally withoutDefaults() { * that will be used. */ public Globally withFanout(int fanout) { - return new Globally<>(name, fn, insertDefault, fanout); + return new Globally<>(name, fn, fnClass, insertDefault, fanout); } /** @@ -1309,7 +1340,7 @@ public Globally withFanout(int fanout) { public Globally withSideInputs( Iterable> sideInputs) { Preconditions.checkState(fn instanceof RequiresContextInternal); - return new Globally(name, fn, insertDefault, fanout, + return new Globally(name, fn, fnClass, insertDefault, fanout, ImmutableList.>copyOf(sideInputs)); } @@ -1320,7 +1351,7 @@ public PCollection apply(PCollection input) { .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())); Combine.PerKey combine = - Combine.fewKeys(fn.asKeyedFn()); + Combine.fewKeys(fn.asKeyedFn(), fnClass); if (!sideInputs.isEmpty()) { combine = combine.withSideInputs(sideInputs); } @@ -1344,6 +1375,12 @@ public PCollection apply(PCollection input) { } } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.populateDisplayData(builder, fn, fnClass); + Combine.populateGlobalDisplayData(builder, fanout, insertDefault); + } + private PCollection insertDefaultValueIfEmpty(PCollection maybeEmpty) { final PCollectionView> maybeEmptyView = maybeEmpty.apply( View.asIterable()); @@ -1370,6 +1407,20 @@ public void processElement(DoFn.ProcessContext c) { } } + private static void populateDisplayData( + DisplayData.Builder builder, HasDisplayData fn, JavaClass fnClass) { + builder + .include(fn, fnClass) + .add("combineFn", fnClass); + } + + private static void populateGlobalDisplayData( + DisplayData.Builder builder, int fanout, boolean insertDefault) { + builder + .addIfNotDefault("fanout", fanout, 0) + .add("emitDefaultOnEmptyInput", insertDefault); + } + /** * {@code Combine.GloballyAsSingletonView} takes a {@code PCollection} * and returns a {@code PCollectionView} whose elements are the result of @@ -1413,12 +1464,15 @@ public static class GloballyAsSingletonView extends PTransform, PCollectionView> { private final GlobalCombineFn fn; + private final JavaClass fnClass; private final boolean insertDefault; private final int fanout; private GloballyAsSingletonView( - GlobalCombineFn fn, boolean insertDefault, int fanout) { + GlobalCombineFn fn, JavaClass fnClass, + boolean insertDefault, int fanout) { this.fn = fn; + this.fnClass = fnClass; this.insertDefault = insertDefault; this.fanout = fanout; } @@ -1449,6 +1503,12 @@ public boolean getInsertDefault() { public GlobalCombineFn getCombineFn() { return fn; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.populateDisplayData(builder, fn, fnClass); + Combine.populateGlobalDisplayData(builder, fanout, insertDefault); + } } /** @@ -1528,6 +1588,11 @@ public List compact(List accumulator) { return accumulator.size() > 1 ? mergeToSingleton(accumulator) : accumulator; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("combineFn", combiner.getClass()); + } + private List mergeToSingleton(Iterable values) { List singleton = new ArrayList<>(); singleton.add(combiner.apply(values)); @@ -1601,30 +1666,35 @@ public static class PerKey extends PTransform>, PCollection>> { private final transient PerKeyCombineFn fn; + private final JavaClass fnClass; private final boolean fewKeys; private final List> sideInputs; private PerKey( - PerKeyCombineFn fn, boolean fewKeys) { + PerKeyCombineFn fn, JavaClass fnClass, + boolean fewKeys) { this.fn = fn; + this.fnClass = fnClass; this.fewKeys = fewKeys; this.sideInputs = ImmutableList.of(); } private PerKey(String name, - PerKeyCombineFn fn, + PerKeyCombineFn fn, JavaClass fnClass, boolean fewKeys, List> sideInputs) { super(name); this.fn = fn; + this.fnClass = fnClass; this.fewKeys = fewKeys; this.sideInputs = sideInputs; } private PerKey( - String name, PerKeyCombineFn fn, + String name, PerKeyCombineFn fn, JavaClass fnClass, boolean fewKeys) { super(name); this.fn = fn; + this.fnClass = fnClass; this.fewKeys = fewKeys; this.sideInputs = ImmutableList.of(); } @@ -1634,7 +1704,7 @@ private PerKey( * specified name. Does not modify this transform. */ public PerKey named(String name) { - return new PerKey(name, fn, fewKeys); + return new PerKey(name, fn, fnClass, fewKeys); } /** @@ -1644,7 +1714,7 @@ public PerKey named(String name) { public PerKey withSideInputs( Iterable> sideInputs) { Preconditions.checkState(fn instanceof RequiresContextInternal); - return new PerKey(name, fn, fewKeys, + return new PerKey(name, fn, fnClass, fewKeys, ImmutableList.>copyOf(sideInputs)); } @@ -1661,7 +1731,7 @@ public PerKey withSideInputs( */ public PerKeyWithHotKeyFanout withHotKeyFanout( SerializableFunction hotKeyFanout) { - return new PerKeyWithHotKeyFanout(name, fn, hotKeyFanout); + return new PerKeyWithHotKeyFanout(name, fn, fnClass, hotKeyFanout); } /** @@ -1669,7 +1739,7 @@ public PerKeyWithHotKeyFanout withHotKeyFanout( * constant value for every key. */ public PerKeyWithHotKeyFanout withHotKeyFanout(final int hotKeyFanout) { - return new PerKeyWithHotKeyFanout(name, fn, + return new PerKeyWithHotKeyFanout(name, fn, fnClass, new SerializableFunction(){ @Override public Integer apply(K unused) { @@ -1698,6 +1768,11 @@ public PCollection> apply(PCollection> input) { .apply(GroupByKey.create(fewKeys)) .apply(Combine.groupedValues(fn).withSideInputs(sideInputs)); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.populateDisplayData(builder, fn, fnClass); + } } /** @@ -1707,13 +1782,16 @@ public static class PerKeyWithHotKeyFanout extends PTransform>, PCollection>> { private final transient PerKeyCombineFn fn; + private final JavaClass fnClass; private final SerializableFunction hotKeyFanout; private PerKeyWithHotKeyFanout(String name, PerKeyCombineFn fn, + JavaClass fnClass, SerializableFunction hotKeyFanout) { super(name); this.fn = fn; + this.fnClass = fnClass; this.hotKeyFanout = hotKeyFanout; } @@ -1996,6 +2074,12 @@ public void processElement(ProcessContext c) { .apply("PostCombine", Combine.perKey(postCombine)); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.populateDisplayData(builder, fn, fnClass); + builder.add("fanoutFn", hotKeyFanout.getClass()); + } + /** * Used to store either an input or accumulator value, for flattening * the hot and cold key paths. @@ -2137,23 +2221,28 @@ public static class GroupedValues PCollection>> { private final PerKeyCombineFn fn; + private final JavaClass fnClass; private final List> sideInputs; - private GroupedValues(PerKeyCombineFn fn) { + private GroupedValues( + PerKeyCombineFn fn, JavaClass fnClass) { this.fn = SerializableUtils.clone(fn); + this.fnClass = fnClass; this.sideInputs = ImmutableList.>of(); } private GroupedValues( PerKeyCombineFn fn, + JavaClass fnClass, List> sideInputs) { this.fn = SerializableUtils.clone(fn); + this.fnClass = fnClass; this.sideInputs = sideInputs; } public GroupedValues withSideInputs( Iterable> sideInputs) { - return new GroupedValues<>(fn, ImmutableList.>copyOf(sideInputs)); + return new GroupedValues<>(fn, fnClass, ImmutableList.>copyOf(sideInputs)); } /** @@ -2240,5 +2329,10 @@ public Coder> getDefaultOutputCoder( kvCoder.getKeyCoder(), kvCoder.getValueCoder()); return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.populateDisplayData(builder, fn, fnClass); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java index a57d4465b208..1b64bb24434b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java @@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; import com.google.common.collect.ImmutableMap; @@ -52,7 +54,7 @@ public class CombineFnBase { * @param type of mutable accumulator values * @param type of output values */ - public interface GlobalCombineFn extends Serializable { + public interface GlobalCombineFn extends Serializable, HasDisplayData { /** * Returns the {@code Coder} to use for accumulator {@code AccumT} @@ -117,7 +119,8 @@ public Coder getDefaultOutputCoder(CoderRegistry registry, Coder type of mutable accumulator values * @param type of output values */ - public interface PerKeyCombineFn extends Serializable { + public interface PerKeyCombineFn + extends Serializable, HasDisplayData { /** * Returns the {@code Coder} to use for accumulator {@code AccumT} * values, or null if it is not able to be inferred. @@ -217,6 +220,16 @@ public TypeVariable getOutputTVariable() { return (TypeVariable) new TypeDescriptor(AbstractGlobalCombineFn.class) {}.getType(); } + + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } } /** @@ -282,5 +295,15 @@ public TypeVariable getOutputTVariable() { return (TypeVariable) new TypeDescriptor(AbstractPerKeyCombineFn.class) {}.getType(); } + + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index d98bd13222b9..6b2fb7074367 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -31,6 +31,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TupleTag; @@ -455,6 +457,13 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder } return new ComposedAccumulatorCoder(coders); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (CombineFn combineFn : combineFns) { + builder.include(combineFn); + } + } } /** @@ -588,6 +597,13 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder } return new ComposedAccumulatorCoder(coders); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (CombineFnWithContext combineFn : combineFnWithContexts) { + builder.include(combineFn); + } + } } /** @@ -769,6 +785,13 @@ public Coder getAccumulatorCoder( } return new ComposedAccumulatorCoder(coders); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (HasDisplayData keyedCombineFn : keyedCombineFns) { + builder.include(keyedCombineFn); + } + } } /** @@ -915,6 +938,13 @@ public Coder getAccumulatorCoder( } return new ComposedAccumulatorCoder(coders); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (HasDisplayData keyedCombineFn : keyedCombineFns) { + builder.include(keyedCombineFn); + } + } } ///////////////////////////////////////////////////////////////////////////// diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index 77a7e536f4d8..9bb4a01f1b2b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollectionView; /** @@ -167,6 +168,11 @@ public Coder getDefaultOutputCoder(CoderRegistry registry, Coder key public CombineFnWithContext forKey(K key, Coder keyCoder) { return CombineFnWithContext.this; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + CombineFnWithContext.this.populateDisplayData(builder); + } }; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java index 132d7f2271be..28749d731e2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.common.Counter; import org.apache.beam.sdk.util.common.Counter.AggregationKind; import org.apache.beam.sdk.util.common.CounterProvider; @@ -204,6 +205,11 @@ public T identity() { public T apply(T left, T right) { return comparator.compare(left, right) >= 0 ? left : right; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("comparer", comparator.getClass()); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java index dcee91f239f1..8f3082ee4dbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.common.Counter; import org.apache.beam.sdk.util.common.Counter.AggregationKind; import org.apache.beam.sdk.util.common.CounterProvider; @@ -204,6 +205,11 @@ public T identity() { public T apply(T left, T right) { return comparator.compare(left, right) <= 0 ? left : right; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("comparer", comparator.getClass()); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 1e621d4c5e71..6362bd43a465 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -153,6 +154,11 @@ public PCollection apply(PCollection in) { .of(new SampleAnyDoFn<>(limit, iterableView))) .setCoder(in.getCoder()); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("sampleSize", limit); + } } /** @@ -188,6 +194,7 @@ public static class FixedSizedSampleFn extends CombineFn, SerializableComparator>>, Iterable> { + private final int sampleSize; private final Top.TopCombineFn, SerializableComparator>> topCombineFn; private final Random rand = new Random(); @@ -196,6 +203,8 @@ private FixedSizedSampleFn(int sampleSize) { if (sampleSize < 0) { throw new IllegalArgumentException("sample size must be >= 0"); } + + this.sampleSize = sampleSize; topCombineFn = new Top.TopCombineFn, SerializableComparator>>( sampleSize, new KV.OrderByKey()); } @@ -244,5 +253,10 @@ public Coder> getDefaultOutputCoder( CoderRegistry registry, Coder inputCoder) { return IterableCoder.of(inputCoder); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("sampleSize", sampleSize); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 82747c2561f1..4b366bc22d63 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; import org.apache.beam.sdk.transforms.Combine.PerKey; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; @@ -392,6 +393,13 @@ public Coder> getAccumulatorCoder( return new BoundedHeapCoder<>(count, compareFn, inputCoder); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("count", count) + .add("comparer", compareFn.getClass()); + } + @Override public String getIncompatibleGlobalWindowErrorMessage() { return "Default values are not supported in Top.[of, smallest, largest]() if the output " diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 3aeed832b9b8..83a8c38eb647 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -128,7 +128,7 @@ public String toString() { return builder.toString(); } - private static String namespaceOf(Class clazz) { + private static String namespaceOf(JavaClass clazz) { return clazz.getName(); } @@ -150,6 +150,13 @@ public interface Builder { */ Builder include(HasDisplayData subComponent, Class namespace); + /** + * Register display metadata from the specified subcomponent, using the specified namespace. + * For example, a {@link ParDo} transform includes display metadata from the encapsulated + * {@link DoFn}. + */ + Builder include(HasDisplayData subComponent, JavaClass namespace); + /** * Register the given string display metadata. The metadata item will be registered with type * {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from @@ -288,6 +295,13 @@ ItemBuilder addIfNotDefault( */ ItemBuilder add(String key, Class value); + /** + * Register the given class display metadata. The metadata item will be registered with type + * {@link DisplayData.Type#JAVA_CLASS}, and is identified by the specified key and namespace + * from the current transform or component. + */ + ItemBuilder add(String key, JavaClass value); + /** * Register the given class display data if the value is not null. * @@ -295,6 +309,13 @@ ItemBuilder addIfNotDefault( */ ItemBuilder addIfNotNull(String key, @Nullable Class value); + /** + * Register the given class display data if the value is not null. + * + * @see DisplayData.Builder#add(String, JavaClass) + */ + ItemBuilder addIfNotNull(String key, @Nullable JavaClass value); + /** * Register the given class display data if the value is different than the specified default. * @@ -303,6 +324,13 @@ ItemBuilder addIfNotDefault( ItemBuilder addIfNotDefault( String key, @Nullable Class value, @Nullable Class defaultValue); + /** + * Register the given class display data if the value is different than the specified default. + * + * @see DisplayData.Builder#add(String, JavaClass) + */ + ItemBuilder addIfNotDefault( + String key, @Nullable JavaClass value, @Nullable JavaClass defaultValue); /** * Register the given display metadata with the specified type. * @@ -345,6 +373,14 @@ public interface ItemBuilder extends Builder { *

Leaving the namespace unspecified will default to the registering instance's class. */ ItemBuilder withNamespace(Class namespace); + + /** + * Adds an explicit namespace to the most-recently added display metadata. The namespace + * and key uniquely identify the display metadata. + * + *

Leaving the namespace unspecified will default to the registering instance's class. + */ + ItemBuilder withNamespace(JavaClass namespace); } /** @@ -362,7 +398,7 @@ public static class Item { private final String label; private final String url; - private static Item create(Class nsClass, String key, Type type, Object value) { + private static Item create(JavaClass nsClass, String key, Type type, Object value) { FormattedItemValue formatted = type.format(value); String namespace = namespaceOf(nsClass); return new Item( @@ -494,7 +530,7 @@ private Item withUrl(String url) { return new Item(this.ns, this.key, this.type, this.value, this.shortValue, url, this.label); } - private Item withNamespace(Class nsClass) { + private Item withNamespace(JavaClass nsClass) { String namespace = namespaceOf(nsClass); return new Item( namespace, this.key, this.type, this.value, this.shortValue, this.url, this.label); @@ -515,7 +551,7 @@ public static class Identifier { private final String ns; private final String key; - public static Identifier of(Class namespace, String key) { + public static Identifier of(JavaClass namespace, String key) { return of(namespaceOf(namespace), key); } @@ -608,7 +644,12 @@ FormattedItemValue format(Object value) { JAVA_CLASS { @Override FormattedItemValue format(Object value) { - Class clazz = checkType(value, Class.class, JAVA_CLASS); + if (value instanceof Class) { + JavaClass javaClass = JavaClass.of((Class) value); + return format(javaClass); + } + + JavaClass clazz = checkType(value, JavaClass.class, JAVA_CLASS); return new FormattedItemValue(clazz.getName(), clazz.getSimpleName()); } }; @@ -644,7 +685,7 @@ private static Type tryInferFrom(@Nullable Object value) { return TIMESTAMP; } else if (value instanceof Duration) { return DURATION; - } else if (value instanceof Class) { + } else if (value instanceof Class || value instanceof JavaClass) { return JAVA_CLASS; } else if (value instanceof String) { return STRING; @@ -680,7 +721,7 @@ private static class InternalBuilder implements ItemBuilder { private final Map entries; private final Set visited; - private Class latestNs; + private JavaClass latestNs; @Nullable private Item latestItem; @@ -704,13 +745,20 @@ public Builder include(HasDisplayData subComponent) { @Override public Builder include(HasDisplayData subComponent, Class namespace) { + checkNotNull(namespace); + + return include(subComponent, JavaClass.of(namespace)); + } + + @Override + public Builder include(HasDisplayData subComponent, JavaClass namespace) { checkNotNull(subComponent); checkNotNull(namespace); commitLatest(); boolean newComponent = visited.add(subComponent); if (newComponent) { - Class prevNs = this.latestNs; + JavaClass prevNs = this.latestNs; this.latestNs = namespace; subComponent.populateDisplayData(this); this.latestNs = prevNs; @@ -821,17 +869,34 @@ public ItemBuilder add(String key, Class value) { return addItemIf(true, key, Type.JAVA_CLASS, value); } + @Override + public ItemBuilder add(String key, JavaClass value) { + checkNotNull(value); + return addItemIf(true, key, Type.JAVA_CLASS, value); + } + @Override public ItemBuilder addIfNotNull(String key, @Nullable Class value) { return addItemIf(value != null, key, Type.JAVA_CLASS, value); } + @Override + public ItemBuilder addIfNotNull(String key, @Nullable JavaClass value) { + return addItemIf(value != null, key, Type.JAVA_CLASS, value); + } + @Override public ItemBuilder addIfNotDefault( String key, @Nullable Class value, @Nullable Class defaultValue) { return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value); } + @Override + public ItemBuilder addIfNotDefault( + String key, @Nullable JavaClass value, @Nullable JavaClass defaultValue) { + return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value); + } + @Override public ItemBuilder add(String key, Type type, Object value) { checkNotNull(value); @@ -887,6 +952,11 @@ public ItemBuilder withLinkUrl(@Nullable String url) { @Override public ItemBuilder withNamespace(Class namespace) { checkNotNull(namespace); + return withNamespace(JavaClass.of(namespace)); + } + + @Override + public ItemBuilder withNamespace(JavaClass namespace) { if (latestItem != null) { latestItem = latestItem.withNamespace(namespace); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java new file mode 100644 index 000000000000..4da92537e01a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.display; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import java.util.Objects; + + +/** + * Display metadata representing a Java class. + * + *

Java classes can be registered as display metadata via + * {@link DisplayData.Builder#add(String, JavaClass)}. {@link JavaClass} is serializable, unlike + * {@link Class} which can fail to serialize for Java 8 lambda functions. + */ +public class JavaClass implements Serializable { + private final String simpleName; + private final String name; + + private JavaClass(Class clazz) { + name = clazz.getName(); + simpleName = clazz.getSimpleName(); + } + + /** + * Create a {@link JavaClass} instance representing the specified class. + */ + public static JavaClass of(Class clazz) { + return new JavaClass(checkNotNull(clazz)); + } + + /** + * Create a {@link JavaClass} from the class of the specified object instance. + */ + public static JavaClass fromInstance(Object obj) { + checkNotNull(obj); + return new JavaClass(obj.getClass()); + } + + /** + * Retrieve the fully-qualified name of the class. + * + * @see Class#getName() + */ + public String getName() { + return name; + } + + /** + * Retrieve a simple representation of the class name. + * + * @see Class#getSimpleName() + */ + public String getSimpleName() { + return simpleName; + } + + @Override + public String toString() { + return name; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof JavaClass) { + JavaClass that = (JavaClass) obj; + return Objects.equals(this.name, that.name); + } + + return false; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java index fbb683c56587..34197f78f8fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.state.StateContext; import java.io.IOException; @@ -101,6 +102,10 @@ public Coder getDefaultOutputCoder( CoderRegistry registry, Coder inputCoder) throws CannotProvideCoderException { return combineFn.getDefaultOutputCoder(registry, inputCoder); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + combineFn.populateDisplayData(builder); + } }; } } @@ -150,6 +155,10 @@ public Coder getDefaultOutputCoder(CoderRegistry registry, Coder key Coder inputCoder) throws CannotProvideCoderException { return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + keyedCombineFn.populateDisplayData(builder); + } }; } } @@ -195,6 +204,10 @@ public Coder getDefaultOutputCoder(CoderRegistry registry, Coder key Coder inputCoder) throws CannotProvideCoderException { return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + combineFn.populateDisplayData(builder); + } private void writeObject(@SuppressWarnings("unused") ObjectOutputStream out) throws IOException { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index 6d62e08e6d5b..1c3e205bb46d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -19,6 +19,8 @@ import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import org.apache.beam.sdk.Pipeline; @@ -29,6 +31,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -233,6 +236,16 @@ public void testAlternateComparator() { Arrays.asList("b", "aaa", "ccccc")); } + @Test + public void testDisplayData() { + Top.Largest comparer = new Top.Largest(); + PTransform approxQuanitiles = ApproximateQuantiles.globally(20, comparer); + DisplayData displayData = DisplayData.from(approxQuanitiles); + + assertThat(displayData, hasDisplayItem("numQuantiles", 20)); + assertThat(displayData, hasDisplayItem("comparer", comparer.getClass())); + } + private Matcher> quantileMatcher( int size, int numQuantiles, int absoluteError) { List> quantiles = new ArrayList<>(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index 3a4b81387fe3..c94c9f1fdd9a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,6 +29,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -290,4 +294,17 @@ public Void apply(Iterable> estimatePerKey) { return null; } } + + @Test + public void testDisplayData() { + ApproximateUnique.Globally specifiedSampleSize = ApproximateUnique.globally(1234); + ApproximateUnique.PerKey specifiedMaxError = ApproximateUnique.perKey(0.1234); + + assertThat(DisplayData.from(specifiedSampleSize), hasDisplayItem("sampleSize", 1234)); + + DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError); + assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234)); + assertThat("calculated sampleSize should be included", maxErrorDisplayData, + hasDisplayItem(hasKey("sampleSize"))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 0a025381a6da..f04708930460 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; @@ -35,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; import org.apache.beam.sdk.transforms.Min.MinIntegerFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -61,7 +63,7 @@ * Unit tests for {@link CombineFns}. */ @RunWith(JUnit4.class) -public class CombineFnsTest { +public class CombineFnsTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @Test @@ -278,6 +280,63 @@ public void testComposedCombineNullValues() { p.run(); } + @Test + public void testComposedCombineDisplayData() { + SimpleFunction extractFn = new SimpleFunction() { + @Override + public String apply(String input) { + return input; + } + }; + + DisplayDataCombineFn combineFn1 = new DisplayDataCombineFn("combineFn1"); + DisplayDataCombineFn combineFn2 = new DisplayDataCombineFn("combineFn2"); + + CombineFns.ComposedCombineFn composedCombine = CombineFns.compose() + .with(extractFn, combineFn1, new TupleTag()) + .with(extractFn, combineFn2, new TupleTag()); + + DisplayData displayData = DisplayData.from(composedCombine); + assertThat(displayData, includes(combineFn1)); + assertThat(displayData, includes(combineFn2)); + } + + private static class DisplayDataCombineFn extends Combine.CombineFn { + private final String value; + private final String key; + private static int i; + + DisplayDataCombineFn(String value) { + this.key = "key" + (++i); + this.value = value; + } + + @Override + public String createAccumulator() { + return null; + } + + @Override + public String addInput(String accumulator, String input) { + return null; + } + + @Override + public String mergeAccumulators(Iterable accumulators) { + return null; + } + + @Override + public String extractOutput(String accumulator) { + return null; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(key, value); + } + } + private static class UserString implements Serializable { private String strValue; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 049baa3925e4..b71064111e28 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -18,7 +18,8 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.checkCombineFn; - +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static com.google.common.base.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; @@ -44,6 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -682,6 +684,24 @@ public void testCombineGetName() { Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName()); } + @Test + public void testDisplayData() { + UniqueInts combineFn = new UniqueInts() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("fnMetadata", "foobar"); + } + }; + Combine.Globally combine = Combine.globally(combineFn) + .withFanout(1234); + DisplayData displayData = DisplayData.from(combine); + + assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass())); + assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true)); + assertThat(displayData, hasDisplayItem("fanout", 1234)); + assertThat(displayData, includes(combineFn)); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java index 5722365d21f5..226255a6c2b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java @@ -18,9 +18,12 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.checkCombineFn; - +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.transforms.display.DisplayData; + import com.google.common.collect.Lists; import org.junit.Test; @@ -65,4 +68,12 @@ public void testMaxDoubleFn() { Lists.newArrayList(1.0, 2.0, 3.0, 4.0), 4.0); } + + @Test + public void testDisplayData() { + Top.Largest comparer = new Top.Largest<>(); + + Combine.Globally max = Max.globally(comparer); + assertThat(DisplayData.from(max), hasDisplayItem("comparer", comparer.getClass())); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java index 8f1d30181c51..d7ec3227b81f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java @@ -17,10 +17,13 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.collect.Lists; import org.junit.Test; @@ -65,4 +68,12 @@ public void testMinDoubleFn() { Lists.newArrayList(1.0, 2.0, 3.0, 4.0), 1.0); } + + @Test + public void testDisplayData() { + Top.Smallest comparer = new Top.Smallest<>(); + + Combine.Globally min = Min.globally(comparer); + assertThat(DisplayData.from(min), hasDisplayItem("comparer", comparer.getClass())); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index 0c2af3f4278e..4b1d5dc42a32 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -20,6 +20,8 @@ import static org.apache.beam.sdk.TestUtils.LINES; import static org.apache.beam.sdk.TestUtils.NO_LINES; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -29,6 +31,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.Joiner; @@ -260,4 +263,15 @@ public void testPickAnyWhenEmpty() { public void testSampleGetName() { assertEquals("Sample.SampleAny", Sample.any(1).getName()); } + + @Test + public void testDisplayData() { + PTransform sampleAny = Sample.any(1234); + DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny); + assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234)); + + PTransform samplePerKey = Sample.fixedSizePerKey(2345); + DisplayData perKeyDisplayData = DisplayData.from(samplePerKey); + assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java index 1815cc90503b..6d580e74f43f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import org.apache.beam.sdk.Pipeline; @@ -25,6 +27,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.Bound; @@ -233,6 +236,16 @@ public void testTopGetNames() { assertEquals("Largest.PerKey", Top.largestPerKey(2).getName()); } + @Test + public void testDisplayData() { + Top.Largest comparer = new Top.Largest(); + Combine.Globally> top = Top.of(1234, comparer); + DisplayData displayData = DisplayData.from(top); + + assertThat(displayData, hasDisplayItem("count", 1234)); + assertThat(displayData, hasDisplayItem("comparer", comparer.getClass())); + } + private static class OrderByLength implements Comparator, Serializable { @Override public int compare(String a, String b) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index 8cfb5c20ec6a..87e707b096e7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -194,9 +194,10 @@ protected void describeMismatchSafely( private DisplayDataComparision checkSubset( DisplayData displayData, DisplayData included, Class namespace) { DisplayDataComparision comparison = new DisplayDataComparision(displayData.items()); + JavaClass javaClass = JavaClass.of(namespace); for (Item item : included.items()) { Item matchedItem = displayData.asMap().get( - DisplayData.Identifier.of(namespace, item.getKey())); + DisplayData.Identifier.of(javaClass, item.getKey())); if (matchedItem != null) { comparison.matched(matchedItem); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 5aee8dd8aa76..bb55cc146483 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -340,7 +340,7 @@ public void testNullNamespaceOverride() { @Override public void populateDisplayData(Builder builder) { builder.add("foo", "bar") - .withNamespace(null); + .withNamespace((Class) null); } }); } @@ -349,10 +349,10 @@ public void populateDisplayData(Builder builder) { public void testIdentifierEquality() { new EqualsTester() .addEqualityGroup( - DisplayData.Identifier.of(DisplayDataTest.class, "1"), - DisplayData.Identifier.of(DisplayDataTest.class, "1")) - .addEqualityGroup(DisplayData.Identifier.of(Object.class, "1")) - .addEqualityGroup(DisplayData.Identifier.of(DisplayDataTest.class, "2")) + DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "1"), + DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "1")) + .addEqualityGroup(DisplayData.Identifier.of(JavaClass.of(Object.class), "1")) + .addEqualityGroup(DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "2")) .testEquals(); } @@ -568,6 +568,7 @@ public void populateDisplayData(DisplayData.Builder builder) { .add("float", 3.14) .add("boolean", true) .add("java_class", DisplayDataTest.class) + .add("java_class2", JavaClass.of(DisplayDataTest.class)) .add("timestamp", Instant.now()) .add("duration", Duration.standardHours(1)); } @@ -583,6 +584,9 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat( items, hasItem(allOf(hasKey("java_class"), hasType(DisplayData.Type.JAVA_CLASS)))); + assertThat( + items, + hasItem(allOf(hasKey("java_class2"), hasType(DisplayData.Type.JAVA_CLASS)))); assertThat( items, hasItem(allOf(hasKey("timestamp"), hasType(DisplayData.Type.TIMESTAMP)))); @@ -678,6 +682,8 @@ public void testKnownTypeInference() { assertEquals(DisplayData.Type.BOOLEAN, DisplayData.inferType(true)); assertEquals(DisplayData.Type.TIMESTAMP, DisplayData.inferType(Instant.now())); assertEquals(DisplayData.Type.DURATION, DisplayData.inferType(Duration.millis(1234))); + assertEquals(DisplayData.Type.JAVA_CLASS, + DisplayData.inferType(JavaClass.of(DisplayDataTest.class))); assertEquals(DisplayData.Type.JAVA_CLASS, DisplayData.inferType(DisplayDataTest.class)); assertEquals(DisplayData.Type.STRING, DisplayData.inferType("hello world")); @@ -773,7 +779,7 @@ public void populateDisplayData(Builder builder) { DisplayData.from(new HasDisplayData() { @Override public void populateDisplayData(Builder builder) { - builder.include(subComponent, null); + builder.include(subComponent, (JavaClass) null); } }); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java new file mode 100644 index 000000000000..d0be89787d82 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.display; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.SerializableUtils; +import com.google.common.testing.EqualsTester; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link JavaClass}. + */ +@RunWith(JUnit4.class) +public class JavaClassTest { + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void testProperties() { + JavaClass thisClass = JavaClass.of(JavaClassTest.class); + assertEquals(JavaClassTest.class.getName(), thisClass.getName()); + assertEquals(JavaClassTest.class.getSimpleName(), thisClass.getSimpleName()); + } + + @Test + public void testInputValidation() { + thrown.expect(NullPointerException.class); + JavaClass.of(null); + } + + @Test + public void testEquality() { + new EqualsTester() + .addEqualityGroup(JavaClass.of(JavaClassTest.class), JavaClass.fromInstance(this)) + .addEqualityGroup(JavaClass.of(JavaClass.class)) + .addEqualityGroup(JavaClass.of(Class.class)) + .testEquals(); + } + + @Test + public void testSerialization() { + SerializableUtils.ensureSerializable(JavaClass.of(JavaClassTest.class)); + } +} diff --git a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java new file mode 100644 index 000000000000..e46e4adeffde --- /dev/null +++ b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.transforms.display; + +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.JavaClass; +import org.apache.beam.sdk.util.SerializableUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 tests for {@link JavaClass}. + */ +@RunWith(JUnit4.class) +public class JavaClassJava8Test implements Serializable { + @Test + public void testLambdaClassSerialization() { + final SerializableFunction f = x -> x; + Serializable myClass = new Serializable() { + // Class references for lambdas do not serialize, which is why we support JavaClass + // Class clazz = f.getClass(); + JavaClass javaClass = JavaClass.fromInstance(f); + }; + + SerializableUtils.ensureSerializable(myClass); + } +} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java index 00fc087d3365..768cb8ffab97 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java @@ -17,12 +17,21 @@ */ package org.apache.beam.sdk.transforms; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import com.google.common.collect.Iterables; + +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -131,4 +140,31 @@ public void testCombinePerKeyInstanceMethodReference() { KV.of("c", 4)); pipeline.run(); } + + /** + * Tests that we can serialize Combine fns constructed from a lambda. Lambdas can be problematic + * because the {@link Class} object is synthetic and cannot be deserialized. + */ + @Test + public void testLambdaSerialization() { + SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); + + try { + SerializableUtils.clone(combiner.getClass()); + Assert.fail("Expected lambda class serialization to fail. " + + "If it's fixed, we can remove special behavior in Combine."); + } catch (IllegalArgumentException e) { + // Expected + } + + Combine.Globally combine = Combine.globally(combiner); + SerializableUtils.clone(combine); // should not throw. + } + + @Test + public void testLambdaDisplayData() { + Combine.Globally combine = Combine.globally(xs -> Iterables.getFirst(xs, 0)); + DisplayData displayData = DisplayData.from(combine); + assertThat(displayData.items(), not(empty())); + } } From d6f7bbfa4aa1008a52114cc8edeba48601b6d844 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 14:09:43 -0700 Subject: [PATCH 02/12] fixup! javadoc cleanup --- .../java/org/apache/beam/sdk/transforms/ApproximateUnique.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java index c7812e4d4d49..c04e4c49ba90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java @@ -169,7 +169,7 @@ static class Globally extends PTransform, PCollection> { private final long sampleSize; /** - * The the desired maximum estimation error, or null if not specified. + * The desired maximum estimation error or null if not specified. */ private final Double maximumEstimationError; From a8262b88387f2d20ed320979e0c71c59cb371864 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 14:10:43 -0700 Subject: [PATCH 03/12] fixup! Add Nullable annotation --- .../org/apache/beam/sdk/transforms/ApproximateUnique.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java index c04e4c49ba90..175897b08a5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java @@ -32,6 +32,8 @@ import com.google.common.hash.HashingOutputStream; import com.google.common.io.ByteStreams; +import org.apache.avro.reflect.Nullable; + import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -171,6 +173,7 @@ static class Globally extends PTransform, PCollection> { /** * The desired maximum estimation error or null if not specified. */ + @Nullable private final Double maximumEstimationError; /** @@ -235,8 +238,9 @@ static class PerKey private final long sampleSize; /** - * The the desired maximum estimation error, if specified. + * The the desired maximum estimation error or null if not specified. */ + @Nullable private final Double maximumEstimationError; /** From 3b728e18735da60fc92cf5cf1a0a7eda0c4aea8d Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 15:49:41 -0700 Subject: [PATCH 04/12] fixup! If more than one combineFn have the same type, add a sequential suffix --- .../beam/sdk/transforms/CombineFns.java | 59 +++++++++++++++---- .../sdk/transforms/display/DisplayData.java | 37 ++++++++---- .../beam/sdk/transforms/CombineFnsTest.java | 23 +++++--- .../display/DisplayDataMatchers.java | 52 +++++++++------- 4 files changed, 120 insertions(+), 51 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 6b2fb7074367..a78c5ef68859 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -37,10 +37,13 @@ import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -49,6 +52,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -460,9 +464,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder @Override public void populateDisplayData(DisplayData.Builder builder) { - for (CombineFn combineFn : combineFns) { - builder.include(combineFn); - } + CombineFns.populateDisplayData(builder, combineFns); } } @@ -600,9 +602,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder @Override public void populateDisplayData(DisplayData.Builder builder) { - for (CombineFnWithContext combineFn : combineFnWithContexts) { - builder.include(combineFn); - } + CombineFns.populateDisplayData(builder, combineFnWithContexts); } } @@ -788,9 +788,7 @@ public Coder getAccumulatorCoder( @Override public void populateDisplayData(DisplayData.Builder builder) { - for (HasDisplayData keyedCombineFn : keyedCombineFns) { - builder.include(keyedCombineFn); - } + CombineFns.populateDisplayData(builder, keyedCombineFns); } } @@ -941,9 +939,7 @@ public Coder getAccumulatorCoder( @Override public void populateDisplayData(DisplayData.Builder builder) { - for (HasDisplayData keyedCombineFn : keyedCombineFns) { - builder.include(keyedCombineFn); - } + CombineFns.populateDisplayData(builder, keyedCombineFns); } } @@ -1038,4 +1034,43 @@ private static void checkUniqueness( "Cannot compose with tuple tag %s because it is already present in the composition.", outputTag); } + + /** + * Populate display data for the {@code combineFns} that make up a composed combine transform. + * + *

The same combineFn class may be used multiple times, in which case we must take special care + * to register display data with unique namespaces. + */ + private static void populateDisplayData( + DisplayData.Builder builder, List combineFns) { + + // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type. + Multimap, HasDisplayData> combineFnMap = ArrayListMultimap.create(); + + for (int i = 0; i < combineFns.size(); i++) { + HasDisplayData combineFn = combineFns.get(i); + builder.add("combineFn" + (i + 1), combineFn.getClass()); + combineFnMap.put(combineFn.getClass(), combineFn); + } + + for (Map.Entry, Collection> combineFnEntries : + combineFnMap.asMap().entrySet()) { + + Collection classCombineFns = combineFnEntries.getValue(); + if (classCombineFns.size() == 1) { + // Only one combineFn of this type, include it directly. + builder.include(Iterables.getOnlyElement(classCombineFns)); + + } else { + // Multiple combineFns of same type, add a namespace suffix so display data is + // unique and ordered. + String baseNamespace = combineFnEntries.getKey().getName(); + for (int i = 0; i < combineFns.size(); i++) { + HasDisplayData combineFn = combineFns.get(i); + String namespace = String.format("%s$%d", baseNamespace, i + 1); + builder.include(combineFn, namespace); + } + } + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 83a8c38eb647..984693957726 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -138,25 +138,34 @@ private static String namespaceOf(JavaClass clazz) { */ public interface Builder { /** - * Register display metadata from the specified subcomponent. For example, a {@link ParDo} - * transform includes display metadata from the encapsulated {@link DoFn}. + * Register display metadata from the specified subcomponent. + * + * @see #include(HasDisplayData, String) */ Builder include(HasDisplayData subComponent); /** * Register display metadata from the specified subcomponent, using the specified namespace. - * For example, a {@link ParDo} transform includes display metadata from the encapsulated - * {@link DoFn}. + * + * @see #include(HasDisplayData, String) */ Builder include(HasDisplayData subComponent, Class namespace); /** * Register display metadata from the specified subcomponent, using the specified namespace. - * For example, a {@link ParDo} transform includes display metadata from the encapsulated - * {@link DoFn}. + * + * @see #include(HasDisplayData, String) */ Builder include(HasDisplayData subComponent, JavaClass namespace); + /** + * Register display metadata from the specified subcomponent, using the specified namespace. + * + *

For example, a {@link ParDo} transform includes display metadata from the encapsulated + * {@link DoFn}. + */ + Builder include(HasDisplayData subComponent, String namespace); + /** * Register the given string display metadata. The metadata item will be registered with type * {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from @@ -398,11 +407,10 @@ public static class Item { private final String label; private final String url; - private static Item create(JavaClass nsClass, String key, Type type, Object value) { + private static Item create(String nsClass, String key, Type type, Object value) { FormattedItemValue formatted = type.format(value); - String namespace = namespaceOf(nsClass); return new Item( - namespace, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null); + nsClass, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null); } private Item( @@ -721,7 +729,7 @@ private static class InternalBuilder implements ItemBuilder { private final Map entries; private final Set visited; - private JavaClass latestNs; + private String latestNs; @Nullable private Item latestItem; @@ -746,19 +754,24 @@ public Builder include(HasDisplayData subComponent) { @Override public Builder include(HasDisplayData subComponent, Class namespace) { checkNotNull(namespace); - return include(subComponent, JavaClass.of(namespace)); } @Override public Builder include(HasDisplayData subComponent, JavaClass namespace) { + checkNotNull(namespace); + return include(subComponent, namespaceOf(namespace)); + } + + @Override + public Builder include(HasDisplayData subComponent, String namespace) { checkNotNull(subComponent); checkNotNull(namespace); commitLatest(); boolean newComponent = visited.add(subComponent); if (newComponent) { - JavaClass prevNs = this.latestNs; + String prevNs = this.latestNs; this.latestNs = namespace; subComponent.populateDisplayData(this); this.latestNs = prevNs; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index f04708930460..84ab9cc3abda 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; + +import static org.hamcrest.Matchers.anyOf; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; @@ -289,25 +292,29 @@ public String apply(String input) { } }; - DisplayDataCombineFn combineFn1 = new DisplayDataCombineFn("combineFn1"); - DisplayDataCombineFn combineFn2 = new DisplayDataCombineFn("combineFn2"); + DisplayDataCombineFn combineFn1 = new DisplayDataCombineFn("value1"); + DisplayDataCombineFn combineFn2 = new DisplayDataCombineFn("value2"); CombineFns.ComposedCombineFn composedCombine = CombineFns.compose() .with(extractFn, combineFn1, new TupleTag()) .with(extractFn, combineFn2, new TupleTag()); DisplayData displayData = DisplayData.from(composedCombine); - assertThat(displayData, includes(combineFn1)); - assertThat(displayData, includes(combineFn2)); + assertThat(displayData, hasDisplayItem("combineFn1", combineFn1.getClass())); + assertThat(displayData, hasDisplayItem("combineFn2", combineFn2.getClass())); + + String nsBase = DisplayDataCombineFn.class.getName(); + assertThat(displayData, includes(combineFn1, nsBase + "$1")); + assertThat(displayData, includes(combineFn2, nsBase + "$2")); } private static class DisplayDataCombineFn extends Combine.CombineFn { private final String value; - private final String key; private static int i; + private final int id; DisplayDataCombineFn(String value) { - this.key = "key" + (++i); + id = ++i; this.value = value; } @@ -333,7 +340,9 @@ public String extractOutput(String accumulator) { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.add(key, value); + builder + .add("uniqueKey" + id, value) + .add("sharedKey", value); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index 87e707b096e7..866021a0ef80 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -149,40 +149,42 @@ protected boolean matchesSafely(DisplayData data, Description mismatchDescriptio } } - /** - * Create a matcher that matches if the examined {@link DisplayData} contains all display data - * registered from the specified subcomponent. - */ - public static Matcher includes(final HasDisplayData subComponent) { + /** @see #includes(HasDisplayData, String) */ + public static Matcher includes(HasDisplayData subComponent) { return includes(subComponent, subComponent.getClass()); } + /** @see #includes(HasDisplayData, String) */ + public static Matcher includes( + HasDisplayData subComponent, Class namespace) { + return includes(subComponent, namespace.getClass().getName()); + } + /** * Create a matcher that matches if the examined {@link DisplayData} contains all display data * registered from the specified subcomponent and namespace. */ public static Matcher includes( - final HasDisplayData subComponent, final Class namespace) { + final HasDisplayData subComponent, final String namespace) { return new CustomTypeSafeMatcher("includes subcomponent") { @Override protected boolean matchesSafely(DisplayData displayData) { - DisplayData subComponentData = DisplayData.from(subComponent); + DisplayData subComponentData = subComponentData(); if (subComponentData.items().size() == 0) { throw new UnsupportedOperationException("subComponent contains no display data; " + "cannot verify whether it is included"); } - DisplayDataComparision comparison = checkSubset(displayData, subComponentData, namespace); + DisplayDataComparison comparison = checkSubset(displayData, subComponentData); return comparison.missingItems.isEmpty(); } - @Override protected void describeMismatchSafely( DisplayData displayData, Description mismatchDescription) { - DisplayData subComponentDisplayData = DisplayData.from(subComponent); - DisplayDataComparision comparison = checkSubset( - displayData, subComponentDisplayData, subComponent.getClass()); + DisplayData subComponentDisplayData = subComponentData(); + DisplayDataComparison comparison = checkSubset( + displayData, subComponentDisplayData); mismatchDescription .appendText("did not include:\n") @@ -191,13 +193,21 @@ protected void describeMismatchSafely( .appendValue(comparison.unmatchedItems); } - private DisplayDataComparision checkSubset( - DisplayData displayData, DisplayData included, Class namespace) { - DisplayDataComparision comparison = new DisplayDataComparision(displayData.items()); - JavaClass javaClass = JavaClass.of(namespace); + private DisplayData subComponentData() { + return DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.include(subComponent, namespace); + } + }); + } + + private DisplayDataComparison checkSubset( + DisplayData displayData, DisplayData included) { + DisplayDataComparison comparison = new DisplayDataComparison(displayData.items()); for (Item item : included.items()) { Item matchedItem = displayData.asMap().get( - DisplayData.Identifier.of(javaClass, item.getKey())); + DisplayData.Identifier.of(item.getNamespace(), item.getKey())); if (matchedItem != null) { comparison.matched(matchedItem); @@ -209,11 +219,11 @@ private DisplayDataComparision checkSubset( return comparison; } - class DisplayDataComparision { + class DisplayDataComparison { Collection missingItems; Collection unmatchedItems; - DisplayDataComparision(Collection superset) { + DisplayDataComparison(Collection superset) { missingItems = Sets.newHashSet(); unmatchedItems = Sets.newHashSet(superset); } @@ -316,7 +326,9 @@ public static Matcher hasValue(Matcher valueMatcher) { valueMatcher, "with value", "value") { @Override protected T featureValueOf(DisplayData.Item actual) { - return (T) actual.getValue(); + @SuppressWarnings("unchecked") + T value = (T) actual.getValue(); + return value; } }; } From 6816b605d80a9a6acb90e0d97297a17490c224a9 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 15:57:12 -0700 Subject: [PATCH 05/12] fixup! Rename JavaClass to ClassForDisplay --- .../apache/beam/sdk/transforms/Combine.java | 61 ++++++++++--------- .../{JavaClass.java => ClassForDisplay.java} | 25 ++++---- .../sdk/transforms/display/DisplayData.java | 42 ++++++------- .../beam/sdk/transforms/CombineFnsTest.java | 1 - ...lassTest.java => ClassForDisplayTest.java} | 21 ++++--- .../transforms/display/DisplayDataTest.java | 14 ++--- ...est.java => ClassForDisplayJava8Test.java} | 10 +-- 7 files changed, 87 insertions(+), 87 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/{JavaClass.java => ClassForDisplay.java} (71%) rename sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/{JavaClassTest.java => ClassForDisplayTest.java} (69%) rename sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/{JavaClassJava8Test.java => ClassForDisplayJava8Test.java} (84%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 1eece1787f82..3566fa52533e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -36,9 +36,9 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; +import org.apache.beam.sdk.transforms.display.ClassForDisplay; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.display.JavaClass; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -103,7 +103,7 @@ private Combine() { */ public static Globally globally( SerializableFunction, V> combiner) { - return globally(IterableCombineFn.of(combiner), JavaClass.fromInstance(combiner)); + return globally(IterableCombineFn.of(combiner), ClassForDisplay.fromInstance(combiner)); } @@ -124,11 +124,11 @@ public static Globally globally( */ public static Globally globally( GlobalCombineFn fn) { - return globally(fn, JavaClass.fromInstance(fn)); + return globally(fn, ClassForDisplay.fromInstance(fn)); } private static Globally globally( - GlobalCombineFn fn, JavaClass fnClass) { + GlobalCombineFn fn, ClassForDisplay fnClass) { return new Globally<>(fn, fnClass, true, 0); } @@ -150,7 +150,7 @@ private static Globally globally( */ public static PerKey perKey( SerializableFunction, V> fn) { - return perKey(IterableCombineFn.of(fn).asKeyedFn(), JavaClass.fromInstance(fn)); + return perKey(IterableCombineFn.of(fn).asKeyedFn(), ClassForDisplay.fromInstance(fn)); } /** @@ -171,7 +171,7 @@ public static PerKey perKey( */ public static PerKey perKey( GlobalCombineFn fn) { - return perKey(fn.asKeyedFn(), JavaClass.fromInstance(fn)); + return perKey(fn.asKeyedFn(), ClassForDisplay.fromInstance(fn)); } /** @@ -192,11 +192,11 @@ public static PerKey perKey( */ public static PerKey perKey( PerKeyCombineFn fn) { - return perKey(fn, JavaClass.fromInstance(fn)); + return perKey(fn, ClassForDisplay.fromInstance(fn)); } private static PerKey perKey( - PerKeyCombineFn fn, JavaClass fnClass) { + PerKeyCombineFn fn, ClassForDisplay fnClass) { return new PerKey<>(fn, fnClass, false /*fewKeys*/); } @@ -205,7 +205,7 @@ private static PerKey perKey( * in {@link GroupByKey}. */ private static PerKey fewKeys( - PerKeyCombineFn fn, JavaClass fnClass) { + PerKeyCombineFn fn, ClassForDisplay fnClass) { return new PerKey<>(fn, fnClass, true /*fewKeys*/); } @@ -232,7 +232,7 @@ private static PerKey fewKeys( */ public static GroupedValues groupedValues( SerializableFunction, V> fn) { - return groupedValues(IterableCombineFn.of(fn).asKeyedFn(), JavaClass.fromInstance(fn)); + return groupedValues(IterableCombineFn.of(fn).asKeyedFn(), ClassForDisplay.fromInstance(fn)); } /** @@ -258,7 +258,7 @@ public static GroupedValues groupedValues( */ public static GroupedValues groupedValues( GlobalCombineFn fn) { - return groupedValues(fn.asKeyedFn(), JavaClass.fromInstance(fn)); + return groupedValues(fn.asKeyedFn(), ClassForDisplay.fromInstance(fn)); } /** @@ -284,11 +284,11 @@ public static GroupedValues groupedValu */ public static GroupedValues groupedValues( PerKeyCombineFn fn) { - return groupedValues(fn, JavaClass.fromInstance(fn)); + return groupedValues(fn, ClassForDisplay.fromInstance(fn)); } private static GroupedValues groupedValues( - PerKeyCombineFn fn, JavaClass fnClass) { + PerKeyCombineFn fn, ClassForDisplay fnClass) { return new GroupedValues<>(fn, fnClass); } @@ -1260,12 +1260,12 @@ public static class Globally extends PTransform, PCollection> { private final GlobalCombineFn fn; - private final JavaClass fnClass; + private final ClassForDisplay fnClass; private final boolean insertDefault; private final int fanout; private final List> sideInputs; - private Globally(GlobalCombineFn fn, JavaClass fnClass, + private Globally(GlobalCombineFn fn, ClassForDisplay fnClass, boolean insertDefault, int fanout) { this.fn = fn; this.fnClass = fnClass; @@ -1275,7 +1275,7 @@ private Globally(GlobalCombineFn fn, JavaClass fnCla } private Globally(String name, GlobalCombineFn fn, - JavaClass fnClass, boolean insertDefault, int fanout) { + ClassForDisplay fnClass, boolean insertDefault, int fanout) { super(name); this.fn = fn; this.fnClass = fnClass; @@ -1285,7 +1285,8 @@ private Globally(String name, GlobalCombineFn fn, } private Globally(String name, GlobalCombineFn fn, - JavaClass fnClass, boolean insertDefault, int fanout, List> sideInputs) { + ClassForDisplay fnClass, boolean insertDefault, int fanout, + List> sideInputs) { super(name); this.fn = fn; this.fnClass = fnClass; @@ -1408,7 +1409,7 @@ public void processElement(DoFn.ProcessContext c) { } private static void populateDisplayData( - DisplayData.Builder builder, HasDisplayData fn, JavaClass fnClass) { + DisplayData.Builder builder, HasDisplayData fn, ClassForDisplay fnClass) { builder .include(fn, fnClass) .add("combineFn", fnClass); @@ -1464,12 +1465,12 @@ public static class GloballyAsSingletonView extends PTransform, PCollectionView> { private final GlobalCombineFn fn; - private final JavaClass fnClass; + private final ClassForDisplay fnClass; private final boolean insertDefault; private final int fanout; private GloballyAsSingletonView( - GlobalCombineFn fn, JavaClass fnClass, + GlobalCombineFn fn, ClassForDisplay fnClass, boolean insertDefault, int fanout) { this.fn = fn; this.fnClass = fnClass; @@ -1666,12 +1667,12 @@ public static class PerKey extends PTransform>, PCollection>> { private final transient PerKeyCombineFn fn; - private final JavaClass fnClass; + private final ClassForDisplay fnClass; private final boolean fewKeys; private final List> sideInputs; private PerKey( - PerKeyCombineFn fn, JavaClass fnClass, + PerKeyCombineFn fn, ClassForDisplay fnClass, boolean fewKeys) { this.fn = fn; this.fnClass = fnClass; @@ -1680,7 +1681,7 @@ private PerKey( } private PerKey(String name, - PerKeyCombineFn fn, JavaClass fnClass, + PerKeyCombineFn fn, ClassForDisplay fnClass, boolean fewKeys, List> sideInputs) { super(name); this.fn = fn; @@ -1690,8 +1691,8 @@ private PerKey(String name, } private PerKey( - String name, PerKeyCombineFn fn, JavaClass fnClass, - boolean fewKeys) { + String name, PerKeyCombineFn fn, + ClassForDisplay fnClass, boolean fewKeys) { super(name); this.fn = fn; this.fnClass = fnClass; @@ -1782,12 +1783,12 @@ public static class PerKeyWithHotKeyFanout extends PTransform>, PCollection>> { private final transient PerKeyCombineFn fn; - private final JavaClass fnClass; + private final ClassForDisplay fnClass; private final SerializableFunction hotKeyFanout; private PerKeyWithHotKeyFanout(String name, PerKeyCombineFn fn, - JavaClass fnClass, + ClassForDisplay fnClass, SerializableFunction hotKeyFanout) { super(name); this.fn = fn; @@ -2221,11 +2222,11 @@ public static class GroupedValues PCollection>> { private final PerKeyCombineFn fn; - private final JavaClass fnClass; + private final ClassForDisplay fnClass; private final List> sideInputs; private GroupedValues( - PerKeyCombineFn fn, JavaClass fnClass) { + PerKeyCombineFn fn, ClassForDisplay fnClass) { this.fn = SerializableUtils.clone(fn); this.fnClass = fnClass; this.sideInputs = ImmutableList.>of(); @@ -2233,7 +2234,7 @@ private GroupedValues( private GroupedValues( PerKeyCombineFn fn, - JavaClass fnClass, + ClassForDisplay fnClass, List> sideInputs) { this.fn = SerializableUtils.clone(fn); this.fnClass = fnClass; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java similarity index 71% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java index 4da92537e01a..455d6e19ad98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/JavaClass.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java @@ -22,36 +22,35 @@ import java.io.Serializable; import java.util.Objects; - /** * Display metadata representing a Java class. * *

Java classes can be registered as display metadata via - * {@link DisplayData.Builder#add(String, JavaClass)}. {@link JavaClass} is serializable, unlike - * {@link Class} which can fail to serialize for Java 8 lambda functions. + * {@link DisplayData.Builder#add(String, ClassForDisplay)}. {@link ClassForDisplay} is + * serializable, unlike {@link Class} which can fail to serialize for Java 8 lambda functions. */ -public class JavaClass implements Serializable { +public class ClassForDisplay implements Serializable { private final String simpleName; private final String name; - private JavaClass(Class clazz) { + private ClassForDisplay(Class clazz) { name = clazz.getName(); simpleName = clazz.getSimpleName(); } /** - * Create a {@link JavaClass} instance representing the specified class. + * Create a {@link ClassForDisplay} instance representing the specified class. */ - public static JavaClass of(Class clazz) { - return new JavaClass(checkNotNull(clazz)); + public static ClassForDisplay of(Class clazz) { + return new ClassForDisplay(checkNotNull(clazz)); } /** - * Create a {@link JavaClass} from the class of the specified object instance. + * Create a {@link ClassForDisplay} from the class of the specified object instance. */ - public static JavaClass fromInstance(Object obj) { + public static ClassForDisplay fromInstance(Object obj) { checkNotNull(obj); - return new JavaClass(obj.getClass()); + return new ClassForDisplay(obj.getClass()); } /** @@ -84,8 +83,8 @@ public int hashCode() { @Override public boolean equals(Object obj) { - if (obj instanceof JavaClass) { - JavaClass that = (JavaClass) obj; + if (obj instanceof ClassForDisplay) { + ClassForDisplay that = (ClassForDisplay) obj; return Objects.equals(this.name, that.name); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 984693957726..9d7c11577814 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -128,7 +128,7 @@ public String toString() { return builder.toString(); } - private static String namespaceOf(JavaClass clazz) { + private static String namespaceOf(ClassForDisplay clazz) { return clazz.getName(); } @@ -156,7 +156,7 @@ public interface Builder { * * @see #include(HasDisplayData, String) */ - Builder include(HasDisplayData subComponent, JavaClass namespace); + Builder include(HasDisplayData subComponent, ClassForDisplay namespace); /** * Register display metadata from the specified subcomponent, using the specified namespace. @@ -309,7 +309,7 @@ ItemBuilder addIfNotDefault( * {@link DisplayData.Type#JAVA_CLASS}, and is identified by the specified key and namespace * from the current transform or component. */ - ItemBuilder add(String key, JavaClass value); + ItemBuilder add(String key, ClassForDisplay value); /** * Register the given class display data if the value is not null. @@ -321,9 +321,9 @@ ItemBuilder addIfNotDefault( /** * Register the given class display data if the value is not null. * - * @see DisplayData.Builder#add(String, JavaClass) + * @see DisplayData.Builder#add(String, ClassForDisplay) */ - ItemBuilder addIfNotNull(String key, @Nullable JavaClass value); + ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value); /** * Register the given class display data if the value is different than the specified default. @@ -336,10 +336,10 @@ ItemBuilder addIfNotDefault( /** * Register the given class display data if the value is different than the specified default. * - * @see DisplayData.Builder#add(String, JavaClass) + * @see DisplayData.Builder#add(String, ClassForDisplay) */ ItemBuilder addIfNotDefault( - String key, @Nullable JavaClass value, @Nullable JavaClass defaultValue); + String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue); /** * Register the given display metadata with the specified type. * @@ -389,7 +389,7 @@ public interface ItemBuilder extends Builder { * *

Leaving the namespace unspecified will default to the registering instance's class. */ - ItemBuilder withNamespace(JavaClass namespace); + ItemBuilder withNamespace(ClassForDisplay namespace); } /** @@ -538,7 +538,7 @@ private Item withUrl(String url) { return new Item(this.ns, this.key, this.type, this.value, this.shortValue, url, this.label); } - private Item withNamespace(JavaClass nsClass) { + private Item withNamespace(ClassForDisplay nsClass) { String namespace = namespaceOf(nsClass); return new Item( namespace, this.key, this.type, this.value, this.shortValue, this.url, this.label); @@ -559,7 +559,7 @@ public static class Identifier { private final String ns; private final String key; - public static Identifier of(JavaClass namespace, String key) { + public static Identifier of(ClassForDisplay namespace, String key) { return of(namespaceOf(namespace), key); } @@ -653,11 +653,11 @@ FormattedItemValue format(Object value) { @Override FormattedItemValue format(Object value) { if (value instanceof Class) { - JavaClass javaClass = JavaClass.of((Class) value); - return format(javaClass); + ClassForDisplay classForDisplay = ClassForDisplay.of((Class) value); + return format(classForDisplay); } - JavaClass clazz = checkType(value, JavaClass.class, JAVA_CLASS); + ClassForDisplay clazz = checkType(value, ClassForDisplay.class, JAVA_CLASS); return new FormattedItemValue(clazz.getName(), clazz.getSimpleName()); } }; @@ -693,7 +693,7 @@ private static Type tryInferFrom(@Nullable Object value) { return TIMESTAMP; } else if (value instanceof Duration) { return DURATION; - } else if (value instanceof Class || value instanceof JavaClass) { + } else if (value instanceof Class || value instanceof ClassForDisplay) { return JAVA_CLASS; } else if (value instanceof String) { return STRING; @@ -754,11 +754,11 @@ public Builder include(HasDisplayData subComponent) { @Override public Builder include(HasDisplayData subComponent, Class namespace) { checkNotNull(namespace); - return include(subComponent, JavaClass.of(namespace)); + return include(subComponent, ClassForDisplay.of(namespace)); } @Override - public Builder include(HasDisplayData subComponent, JavaClass namespace) { + public Builder include(HasDisplayData subComponent, ClassForDisplay namespace) { checkNotNull(namespace); return include(subComponent, namespaceOf(namespace)); } @@ -883,7 +883,7 @@ public ItemBuilder add(String key, Class value) { } @Override - public ItemBuilder add(String key, JavaClass value) { + public ItemBuilder add(String key, ClassForDisplay value) { checkNotNull(value); return addItemIf(true, key, Type.JAVA_CLASS, value); } @@ -894,7 +894,7 @@ public ItemBuilder addIfNotNull(String key, @Nullable Class value) { } @Override - public ItemBuilder addIfNotNull(String key, @Nullable JavaClass value) { + public ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value) { return addItemIf(value != null, key, Type.JAVA_CLASS, value); } @@ -906,7 +906,7 @@ public ItemBuilder addIfNotDefault( @Override public ItemBuilder addIfNotDefault( - String key, @Nullable JavaClass value, @Nullable JavaClass defaultValue) { + String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue) { return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value); } @@ -965,11 +965,11 @@ public ItemBuilder withLinkUrl(@Nullable String url) { @Override public ItemBuilder withNamespace(Class namespace) { checkNotNull(namespace); - return withNamespace(JavaClass.of(namespace)); + return withNamespace(ClassForDisplay.of(namespace)); } @Override - public ItemBuilder withNamespace(JavaClass namespace) { + public ItemBuilder withNamespace(ClassForDisplay namespace) { if (latestItem != null) { latestItem = latestItem.withNamespace(namespace); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 84ab9cc3abda..90b001f0f24a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; -import static org.hamcrest.Matchers.anyOf; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java similarity index 69% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java index d0be89787d82..19f56c60b751 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/JavaClassTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java @@ -29,37 +29,38 @@ import org.junit.runners.JUnit4; /** - * Unit tests for {@link JavaClass}. + * Unit tests for {@link ClassForDisplay}. */ @RunWith(JUnit4.class) -public class JavaClassTest { +public class ClassForDisplayTest { @Rule public final ExpectedException thrown = ExpectedException.none(); @Test public void testProperties() { - JavaClass thisClass = JavaClass.of(JavaClassTest.class); - assertEquals(JavaClassTest.class.getName(), thisClass.getName()); - assertEquals(JavaClassTest.class.getSimpleName(), thisClass.getSimpleName()); + ClassForDisplay thisClass = ClassForDisplay.of(ClassForDisplayTest.class); + assertEquals(ClassForDisplayTest.class.getName(), thisClass.getName()); + assertEquals(ClassForDisplayTest.class.getSimpleName(), thisClass.getSimpleName()); } @Test public void testInputValidation() { thrown.expect(NullPointerException.class); - JavaClass.of(null); + ClassForDisplay.of(null); } @Test public void testEquality() { new EqualsTester() - .addEqualityGroup(JavaClass.of(JavaClassTest.class), JavaClass.fromInstance(this)) - .addEqualityGroup(JavaClass.of(JavaClass.class)) - .addEqualityGroup(JavaClass.of(Class.class)) + .addEqualityGroup( + ClassForDisplay.of(ClassForDisplayTest.class), ClassForDisplay.fromInstance(this)) + .addEqualityGroup(ClassForDisplay.of(ClassForDisplay.class)) + .addEqualityGroup(ClassForDisplay.of(Class.class)) .testEquals(); } @Test public void testSerialization() { - SerializableUtils.ensureSerializable(JavaClass.of(JavaClassTest.class)); + SerializableUtils.ensureSerializable(ClassForDisplay.of(ClassForDisplayTest.class)); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index bb55cc146483..106c44139b5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -349,10 +349,10 @@ public void populateDisplayData(Builder builder) { public void testIdentifierEquality() { new EqualsTester() .addEqualityGroup( - DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "1"), - DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "1")) - .addEqualityGroup(DisplayData.Identifier.of(JavaClass.of(Object.class), "1")) - .addEqualityGroup(DisplayData.Identifier.of(JavaClass.of(DisplayDataTest.class), "2")) + DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1"), + DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1")) + .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(Object.class), "1")) + .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "2")) .testEquals(); } @@ -568,7 +568,7 @@ public void populateDisplayData(DisplayData.Builder builder) { .add("float", 3.14) .add("boolean", true) .add("java_class", DisplayDataTest.class) - .add("java_class2", JavaClass.of(DisplayDataTest.class)) + .add("java_class2", ClassForDisplay.of(DisplayDataTest.class)) .add("timestamp", Instant.now()) .add("duration", Duration.standardHours(1)); } @@ -683,7 +683,7 @@ public void testKnownTypeInference() { assertEquals(DisplayData.Type.TIMESTAMP, DisplayData.inferType(Instant.now())); assertEquals(DisplayData.Type.DURATION, DisplayData.inferType(Duration.millis(1234))); assertEquals(DisplayData.Type.JAVA_CLASS, - DisplayData.inferType(JavaClass.of(DisplayDataTest.class))); + DisplayData.inferType(ClassForDisplay.of(DisplayDataTest.class))); assertEquals(DisplayData.Type.JAVA_CLASS, DisplayData.inferType(DisplayDataTest.class)); assertEquals(DisplayData.Type.STRING, DisplayData.inferType("hello world")); @@ -779,7 +779,7 @@ public void populateDisplayData(Builder builder) { DisplayData.from(new HasDisplayData() { @Override public void populateDisplayData(Builder builder) { - builder.include(subComponent, (JavaClass) null); + builder.include(subComponent, (ClassForDisplay) null); } }); } diff --git a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java similarity index 84% rename from sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java rename to sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java index e46e4adeffde..0bb229652c57 100644 --- a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/JavaClassJava8Test.java +++ b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java @@ -18,7 +18,7 @@ package com.google.cloud.dataflow.sdk.transforms.display; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.JavaClass; +import org.apache.beam.sdk.transforms.display.ClassForDisplay; import org.apache.beam.sdk.util.SerializableUtils; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,17 +27,17 @@ import java.io.Serializable; /** - * Java 8 tests for {@link JavaClass}. + * Java 8 tests for {@link ClassForDisplay}. */ @RunWith(JUnit4.class) -public class JavaClassJava8Test implements Serializable { +public class ClassForDisplayJava8Test implements Serializable { @Test public void testLambdaClassSerialization() { final SerializableFunction f = x -> x; Serializable myClass = new Serializable() { - // Class references for lambdas do not serialize, which is why we support JavaClass + // Class references for lambdas do not serialize, which is why we support ClassForDisplay // Class clazz = f.getClass(); - JavaClass javaClass = JavaClass.fromInstance(f); + ClassForDisplay javaClass = ClassForDisplay.fromInstance(f); }; SerializableUtils.ensureSerializable(myClass); From b92ffd1ccbc4a452c2e4e5aa473259afa21e4bf3 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 16:09:30 -0700 Subject: [PATCH 06/12] fixup! improve javadoc --- .../sdk/transforms/display/ClassForDisplayJava8Test.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java index 0bb229652c57..e8955b3dc545 100644 --- a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java +++ b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java @@ -36,6 +36,7 @@ public void testLambdaClassSerialization() { final SerializableFunction f = x -> x; Serializable myClass = new Serializable() { // Class references for lambdas do not serialize, which is why we support ClassForDisplay + // Specifically, the following would not work: // Class clazz = f.getClass(); ClassForDisplay javaClass = ClassForDisplay.fromInstance(f); }; From 33766525c4360de390057183c29b620d997ea113 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 16:10:36 -0700 Subject: [PATCH 07/12] fixup! improve javadoc --- .../org/apache/beam/sdk/transforms/CombineJava8Test.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java index 768cb8ffab97..3910b21b2f5e 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java @@ -142,8 +142,9 @@ public void testCombinePerKeyInstanceMethodReference() { } /** - * Tests that we can serialize Combine fns constructed from a lambda. Lambdas can be problematic - * because the {@link Class} object is synthetic and cannot be deserialized. + * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda. + * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be + * deserialized. */ @Test public void testLambdaSerialization() { From 053497a695322332d47da90772361368d23825e1 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 14 Apr 2016 16:16:44 -0700 Subject: [PATCH 08/12] fixup! Use Assume rather than Assert to test assumptions --- .../apache/beam/sdk/transforms/CombineJava8Test.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java index 3910b21b2f5e..132247b2474b 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java @@ -31,7 +31,7 @@ import com.google.common.collect.Iterables; -import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -150,13 +150,18 @@ public void testCombinePerKeyInstanceMethodReference() { public void testLambdaSerialization() { SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); + boolean lambdaClassSerializationThrows; try { SerializableUtils.clone(combiner.getClass()); - Assert.fail("Expected lambda class serialization to fail. " - + "If it's fixed, we can remove special behavior in Combine."); + lambdaClassSerializationThrows = false; } catch (IllegalArgumentException e) { // Expected + lambdaClassSerializationThrows = true; } + Assume.assumeTrue("Expected lambda class serialization to fail. " + + "If it's fixed, we can remove special behavior in Combine.", + lambdaClassSerializationThrows); + Combine.Globally combine = Combine.globally(combiner); SerializableUtils.clone(combine); // should not throw. From ab7677819f1db198c8ec131a86b382a91039fd58 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 15 Apr 2016 10:10:19 -0700 Subject: [PATCH 09/12] fixup! Change the suffix separator in CombineFns display data --- .../main/java/org/apache/beam/sdk/transforms/CombineFns.java | 2 +- .../java/org/apache/beam/sdk/transforms/CombineFnsTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index a78c5ef68859..ed454987e8ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -1067,7 +1067,7 @@ private static void populateDisplayData( String baseNamespace = combineFnEntries.getKey().getName(); for (int i = 0; i < combineFns.size(); i++) { HasDisplayData combineFn = combineFns.get(i); - String namespace = String.format("%s$%d", baseNamespace, i + 1); + String namespace = String.format("%s#%d", baseNamespace, i + 1); builder.include(combineFn, namespace); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 90b001f0f24a..e66f13a5611d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -303,8 +303,8 @@ public String apply(String input) { assertThat(displayData, hasDisplayItem("combineFn2", combineFn2.getClass())); String nsBase = DisplayDataCombineFn.class.getName(); - assertThat(displayData, includes(combineFn1, nsBase + "$1")); - assertThat(displayData, includes(combineFn2, nsBase + "$2")); + assertThat(displayData, includes(combineFn1, nsBase + "#1")); + assertThat(displayData, includes(combineFn2, nsBase + "#2")); } private static class DisplayDataCombineFn extends Combine.CombineFn { From 6170e4cc0d2d525609272739d0cb28005cd68099 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 15 Apr 2016 10:13:09 -0700 Subject: [PATCH 10/12] fixup! javadoc formatting --- .../org/apache/beam/sdk/transforms/display/DisplayData.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 9d7c11577814..6065dc486180 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -161,7 +161,7 @@ public interface Builder { /** * Register display metadata from the specified subcomponent, using the specified namespace. * - *

For example, a {@link ParDo} transform includes display metadata from the encapsulated + *

For example, a {@link ParDo} transform includes display metadata from the encapsulated * {@link DoFn}. */ Builder include(HasDisplayData subComponent, String namespace); From 93e920c1619b3263748eac65ad0d59c1d52c608b Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 15 Apr 2016 10:15:45 -0700 Subject: [PATCH 11/12] fixup! fix introduced bug in DisplayDataMatchers --- .../apache/beam/sdk/transforms/display/DisplayDataMatchers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index 866021a0ef80..abdc350cddd7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -157,7 +157,7 @@ public static Matcher includes(HasDisplayData subComponent) { /** @see #includes(HasDisplayData, String) */ public static Matcher includes( HasDisplayData subComponent, Class namespace) { - return includes(subComponent, namespace.getClass().getName()); + return includes(subComponent, namespace.getName()); } /** From fa54c698205b9af6b451ceaa32c42dec6eaaa9b9 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 15 Apr 2016 10:44:11 -0700 Subject: [PATCH 12/12] fixup! checkstyle --- .../sdk/transforms/display/ClassForDisplayJava8Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java index e8955b3dc545..4db235fd9347 100644 --- a/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java +++ b/sdks/java/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/ClassForDisplayJava8Test.java @@ -36,7 +36,7 @@ public void testLambdaClassSerialization() { final SerializableFunction f = x -> x; Serializable myClass = new Serializable() { // Class references for lambdas do not serialize, which is why we support ClassForDisplay - // Specifically, the following would not work: + // Specifically, the following would not work: // Class clazz = f.getClass(); ClassForDisplay javaClass = ClassForDisplay.fromInstance(f); };