From 99bea04455935cc5b3f220eacf38038282874e4d Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 28 Aug 2019 20:47:29 -0700 Subject: [PATCH 01/12] add assert that always fails to check if CloudObjectsTest ever runs --- .../org/apache/beam/runners/dataflow/util/CloudObjectsTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 215567e10797..ff7bd3a0e833 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -156,6 +156,7 @@ public static Iterable> data() { @Test public void toAndFromCloudObject() throws Exception { + assertEquals(true, false); CloudObject cloudObject = CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null); Coder fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); @@ -165,6 +166,7 @@ public void toAndFromCloudObject() throws Exception { @Test public void toAndFromCloudObjectWithSdkComponents() throws Exception { + assertEquals(true, false); SdkComponents sdkComponents = SdkComponents.create(); CloudObject cloudObject = CloudObjects.asCloudObject(coder, sdkComponents); Coder fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); From 71b4531dcf950d233ea911a3abaced610146d428 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 30 Aug 2019 17:01:55 -0700 Subject: [PATCH 02/12] Remove forced failures and add Enclosed annotation --- .../apache/beam/runners/dataflow/util/CloudObjectsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index ff7bd3a0e833..ef2b8ae52caf 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -63,6 +63,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.Builder; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; @@ -70,6 +71,7 @@ import org.junit.runners.Parameterized.Parameters; /** Tests for {@link CloudObjects}. */ +@RunWith(Enclosed.class) public class CloudObjectsTest { /** Tests that all of the Default Coders are tested. */ @RunWith(JUnit4.class) @@ -156,7 +158,6 @@ public static Iterable> data() { @Test public void toAndFromCloudObject() throws Exception { - assertEquals(true, false); CloudObject cloudObject = CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null); Coder fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); @@ -166,7 +167,6 @@ public void toAndFromCloudObject() throws Exception { @Test public void toAndFromCloudObjectWithSdkComponents() throws Exception { - assertEquals(true, false); SdkComponents sdkComponents = SdkComponents.create(); CloudObject cloudObject = CloudObjects.asCloudObject(coder, sdkComponents); Coder fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); From 8fb836ba1eb56b56d37dfb00cc6ae54b73fc8ca1 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 30 Aug 2019 17:07:38 -0700 Subject: [PATCH 03/12] Fix some simple test failures --- .../runners/dataflow/util/CloudObjectsTest.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index ef2b8ae52caf..9307793c716c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -17,17 +17,19 @@ */ package org.apache.beam.runners.dataflow.util; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -62,6 +64,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.Builder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.junit.Test; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; @@ -185,12 +188,18 @@ private static void checkPipelineProtoCoderIds( assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID)); assertEquals( sdkComponents.registerCoder(coder), - cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID)); + ((CloudObject) cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID)) + .get(PropertyNames.VALUE)); } List> coderArguments = coder.getCoderArguments(); Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS); - assertTrue(cloudComponentsObject instanceof List); - List cloudComponents = (List) cloudComponentsObject; + List cloudComponents; + if (cloudComponentsObject == null) { + cloudComponents = Lists.newArrayList(); + } else { + assertThat(cloudComponentsObject, instanceOf(List.class)); + cloudComponents = (List) cloudComponentsObject; + } assertEquals(coderArguments.size(), cloudComponents.size()); for (int i = 0; i < coderArguments.size(); i++) { checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents); From d9f298ee1b53690f60d4f6d5f7a3de1cb89e5e05 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 3 Sep 2019 16:46:34 -0700 Subject: [PATCH 04/12] Use components rather than coder arguments for structured coders --- .../runners/dataflow/util/CloudObjectsTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 9307793c716c..af6a7a634941 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -191,7 +191,12 @@ private static void checkPipelineProtoCoderIds( ((CloudObject) cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID)) .get(PropertyNames.VALUE)); } - List> coderArguments = coder.getCoderArguments(); + List> expectedComponents; + if (coder instanceof StructuredCoder) { + expectedComponents = ((StructuredCoder)coder).getComponents(); + } else { + expectedComponents = coder.getCoderArguments(); + } Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS); List cloudComponents; if (cloudComponentsObject == null) { @@ -200,9 +205,10 @@ private static void checkPipelineProtoCoderIds( assertThat(cloudComponentsObject, instanceOf(List.class)); cloudComponents = (List) cloudComponentsObject; } - assertEquals(coderArguments.size(), cloudComponents.size()); - for (int i = 0; i < coderArguments.size(); i++) { - checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents); + assertEquals(expectedComponents.size(), cloudComponents.size()); + for (int i = 0; i < expectedComponents.size(); i++) { + checkPipelineProtoCoderIds(expectedComponents.get(i), cloudComponents.get(i), sdkComponents); + } } } From a6c768a91ac610961d3599c40fbb4066fe95bf5f Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 13:13:10 -0700 Subject: [PATCH 05/12] Add StringUtf8Coder, DoubleCoder to the list of Dataflow known coders --- .../org/apache/beam/runners/dataflow/util/CloudObjects.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index ec51351a8783..fe294bc25f56 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -29,9 +29,11 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; @@ -49,6 +51,8 @@ private CloudObjects() {} ByteArrayCoder.class, KvCoder.class, VarLongCoder.class, + DoubleCoder.class, + StringUtf8Coder.class, IntervalWindowCoder.class, IterableCoder.class, Timer.Coder.class, From b1a07f77150d6d535cc8e546ebc44da91c7630c6 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 16:47:11 -0700 Subject: [PATCH 06/12] add equals and hashCode to SchemaCoder/RowCoder --- .../org/apache/beam/sdk/coders/RowCoder.java | 18 ++++++++++++++++ .../apache/beam/sdk/schemas/SchemaCoder.java | 21 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index f6cfe6ac34ae..79faa915b824 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -247,4 +248,21 @@ public String toString() { String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder(); return string; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowCoder rowCoder = (RowCoder) o; + return schema.equals(rowCoder.schema); + } + + @Override + public int hashCode() { + return Objects.hash(schema); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index 0199534c3aba..a6511d759d8d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.CustomCoder; @@ -100,4 +101,24 @@ public boolean consistentWithEquals() { public String toString() { return "SchemaCoder: " + rowCoder.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaCoder that = (SchemaCoder) o; + // Two SchemaCoders are considered equal if their schemas are equal *and* their typeDescriptors + // (representing type T) are equal + return rowCoder.equals(that.rowCoder) + && getEncodedTypeDescriptor().equals(that.getEncodedTypeDescriptor()); + } + + @Override + public int hashCode() { + return Objects.hash(rowCoder, getEncodedTypeDescriptor()); + } } From bd773b81982bed270548974644e2c156d74bd949 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 16:59:13 -0700 Subject: [PATCH 07/12] !fixup remove unused import --- .../org/apache/beam/runners/dataflow/util/CloudObjectsTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index af6a7a634941..be8af6b123ca 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -29,7 +29,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; From 1da2f1e3acff6da5ac0670a2721ee1c7d01723cf Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 17:06:46 -0700 Subject: [PATCH 08/12] !fixup formatting --- .../apache/beam/runners/dataflow/util/CloudObjectsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index be8af6b123ca..eecb1cd21b24 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -192,7 +192,7 @@ private static void checkPipelineProtoCoderIds( } List> expectedComponents; if (coder instanceof StructuredCoder) { - expectedComponents = ((StructuredCoder)coder).getComponents(); + expectedComponents = ((StructuredCoder) coder).getComponents(); } else { expectedComponents = coder.getCoderArguments(); } @@ -206,8 +206,8 @@ private static void checkPipelineProtoCoderIds( } assertEquals(expectedComponents.size(), cloudComponents.size()); for (int i = 0; i < expectedComponents.size(); i++) { - checkPipelineProtoCoderIds(expectedComponents.get(i), cloudComponents.get(i), sdkComponents); - + checkPipelineProtoCoderIds( + expectedComponents.get(i), cloudComponents.get(i), sdkComponents); } } } From b858ccfccc640d82c8805e321e476a46dd461bd0 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 17:16:24 -0700 Subject: [PATCH 09/12] Add SchemaCoder test with an actual schema --- .../dataflow/util/CloudObjectsTest.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index eecb1cd21b24..2b470c625aeb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -51,7 +51,9 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.LogicalTypes; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; @@ -75,6 +77,20 @@ /** Tests for {@link CloudObjects}. */ @RunWith(Enclosed.class) public class CloudObjectsTest { + private static final Schema TEST_SCHEMA = + Schema.builder() + .addBooleanField("bool") + .addByteField("int8") + .addInt16Field("int16") + .addInt32Field("int32") + .addInt64Field("int64") + .addFloatField("float") + .addDoubleField("double") + .addStringField("string") + .addArrayField("list_int32", FieldType.INT32) + .addLogicalTypeField("fixed_bytes", LogicalTypes.FixedBytes.of(4)) + .build(); + /** Tests that all of the Default Coders are tested. */ @RunWith(JUnit4.class) public static class DefaultsPresentTest { @@ -147,7 +163,8 @@ public static Iterable> data() { CoGbkResultSchema.of( ImmutableList.of(new TupleTag(), new TupleTag())), UnionCoder.of(ImmutableList.of(VarLongCoder.of(), ByteArrayCoder.of())))) - .add(SchemaCoder.of(Schema.builder().build())); + .add(SchemaCoder.of(Schema.builder().build())) + .add(SchemaCoder.of(TEST_SCHEMA)); for (Class atomicCoder : DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) { dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build()); From 5528e0dc0842b3fd0416042bab48522c9a283f9d Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 6 Sep 2019 12:44:10 -0700 Subject: [PATCH 10/12] Remove StringUtf8Coder from dataflow known coders --- .../org/apache/beam/runners/dataflow/util/CloudObjects.java | 2 -- .../apache/beam/runners/dataflow/util/CloudObjectsTest.java | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index fe294bc25f56..ef47e1fdd15f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; @@ -52,7 +51,6 @@ private CloudObjects() {} KvCoder.class, VarLongCoder.class, DoubleCoder.class, - StringUtf8Coder.class, IntervalWindowCoder.class, IterableCoder.class, Timer.Coder.class, diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 2b470c625aeb..26b721cdd060 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -33,7 +33,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.beam.runners.core.construction.ModelCoderRegistrar; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -198,7 +197,7 @@ public void toAndFromCloudObjectWithSdkComponents() throws Exception { private static void checkPipelineProtoCoderIds( Coder coder, CloudObject cloudObject, SdkComponents sdkComponents) throws Exception { - if (ModelCoderRegistrar.isKnownCoder(coder)) { + if (CloudObjects.DATAFLOW_KNOWN_CODERS.contains(coder.getClass())) { assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID)); } else { assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID)); From 2177bcb2568efbfbce7e904f335f651fb07570be Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 6 Sep 2019 12:46:08 -0700 Subject: [PATCH 11/12] SchemaCoder.equals compares fromRow/toRow, make row identity funcs have good equals --- .../apache/beam/sdk/schemas/SchemaCoder.java | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index a6511d759d8d..e4a6e6c6e648 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.Row; /** {@link SchemaCoder} is used as the coder for types that have schemas registered. */ @@ -59,7 +58,7 @@ public static SchemaCoder of( /** Returns a {@link SchemaCoder} for {@link Row} classes. */ public static SchemaCoder of(Schema schema) { return new SchemaCoder<>( - schema, SerializableFunctions.identity(), SerializableFunctions.identity()); + schema, identity(), identity()); } /** Returns the schema associated with this type. */ @@ -111,14 +110,37 @@ public boolean equals(Object o) { return false; } SchemaCoder that = (SchemaCoder) o; - // Two SchemaCoders are considered equal if their schemas are equal *and* their typeDescriptors - // (representing type T) are equal - return rowCoder.equals(that.rowCoder) - && getEncodedTypeDescriptor().equals(that.getEncodedTypeDescriptor()); + return rowCoder.equals(that.rowCoder) && + toRowFunction.equals(that.toRowFunction) && + fromRowFunction.equals(that.fromRowFunction); } @Override public int hashCode() { - return Objects.hash(rowCoder, getEncodedTypeDescriptor()); + return Objects.hash(rowCoder, toRowFunction, fromRowFunction); + } + + private static RowIdentity identity() { + return new RowIdentity(); + } + + private static class RowIdentity implements SerializableFunction { + @Override + public Row apply(Row input) { + return input; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } } } From 29a6efcceca2778a86213cabc0aa57970168d572 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 6 Sep 2019 13:12:17 -0700 Subject: [PATCH 12/12] formatting --- .../java/org/apache/beam/sdk/schemas/SchemaCoder.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index e4a6e6c6e648..9e06b4420fc0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -57,8 +57,7 @@ public static SchemaCoder of( /** Returns a {@link SchemaCoder} for {@link Row} classes. */ public static SchemaCoder of(Schema schema) { - return new SchemaCoder<>( - schema, identity(), identity()); + return new SchemaCoder<>(schema, identity(), identity()); } /** Returns the schema associated with this type. */ @@ -110,9 +109,9 @@ public boolean equals(Object o) { return false; } SchemaCoder that = (SchemaCoder) o; - return rowCoder.equals(that.rowCoder) && - toRowFunction.equals(that.toRowFunction) && - fromRowFunction.equals(that.fromRowFunction); + return rowCoder.equals(that.rowCoder) + && toRowFunction.equals(that.toRowFunction) + && fromRowFunction.equals(that.fromRowFunction); } @Override @@ -124,7 +123,7 @@ private static RowIdentity identity() { return new RowIdentity(); } - private static class RowIdentity implements SerializableFunction { + private static class RowIdentity implements SerializableFunction { @Override public Row apply(Row input) { return input;