From d2e5106045ea18db91dae7b30893f06a71f388da Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:12:10 +0300 Subject: [PATCH 01/11] add coder readable files --- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java new file mode 100644 index 000000000000..824630b045b1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MetadataCoderV2; + +/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +public class ReadableFileCoder extends AtomicCoder { + private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); + + /** Returns the instance of {@link ReadableFileCoder}. */ + public static ReadableFileCoder of() { + return INSTANCE; + } + + @Override + public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { + MetadataCoderV2.of().encode(value.getMetadata(), os); + VarIntCoder.of().encode(value.getCompression().ordinal(), os); + } + + @Override + public FileIO.ReadableFile decode(InputStream is) throws IOException { + MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); + Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + return new FileIO.ReadableFile(metadata, compression); + } +} From 058400d4d954c3b9ce615edced06629926559a8a Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:12:10 +0300 Subject: [PATCH 02/11] add coder readable files --- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java new file mode 100644 index 000000000000..db8258166f14 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MetadataCoderV2; + +/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +public class ReadableFileCoderV2 extends AtomicCoder { + private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); + + /** Returns the instance of {@link ReadableFileCoder}. */ + public static ReadableFileCoderV2 of() { + return INSTANCE; + } + + @Override + public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { + MetadataCoderV2.of().encode(value.getMetadata(), os); + VarIntCoder.of().encode(value.getCompression().ordinal(), os); + } + + @Override + public FileIO.ReadableFile decode(InputStream is) throws IOException { + MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); + Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + return new FileIO.ReadableFile(metadata, compression); + } +} From 4dec390c48ffb621f2d09d385771b13a9eaa8ed1 Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:50:10 +0300 Subject: [PATCH 03/11] :sdks:java:core:spotlessApply --- .../java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java index db8258166f14..36e8ecbc0385 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -26,7 +26,10 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MetadataCoderV2; -/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +/** + * A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link + * Metadata#lastModifiedMillis()}. + */ public class ReadableFileCoderV2 extends AtomicCoder { private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); From a224a66e76c49ead497b953dceab5899fa1df309 Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 22:57:51 +0300 Subject: [PATCH 04/11] [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder --- .../apache/beam/sdk/io/ReadableFileCoder.java | 12 +++-- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 53 ------------------- 2 files changed, 9 insertions(+), 56 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 51bb83ea9cc0..9f629b415a91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -24,26 +24,32 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MetadataCoder; /** A {@link Coder} for {@link FileIO.ReadableFile}. */ public class ReadableFileCoder extends AtomicCoder { private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); + private final AtomicCoder metadataCoder; /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { + this.metadataCoder = MetadataCoder.of(); + return INSTANCE; + } + + public static ReadableFileCoder of(AtomicCoder metadataCoder) { + this.metadataCoder = metadataCoder; return INSTANCE; } @Override public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - MetadataCoder.of().encode(value.getMetadata(), os); + metadataCoder.encode(value.getMetadata(), os); VarIntCoder.of().encode(value.getCompression().ordinal(), os); } @Override public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = MetadataCoder.of().decode(is); + MatchResult.Metadata metadata = metadataCoder.decode(is); Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; return new FileIO.ReadableFile(metadata, compression); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java deleted file mode 100644 index 36e8ecbc0385..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MetadataCoderV2; - -/** - * A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link - * Metadata#lastModifiedMillis()}. - */ -public class ReadableFileCoderV2 extends AtomicCoder { - private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); - - /** Returns the instance of {@link ReadableFileCoder}. */ - public static ReadableFileCoderV2 of() { - return INSTANCE; - } - - @Override - public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - MetadataCoderV2.of().encode(value.getMetadata(), os); - VarIntCoder.of().encode(value.getCompression().ordinal(), os); - } - - @Override - public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); - Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; - return new FileIO.ReadableFile(metadata, compression); - } -} From 1e3e452d9fc2879e620a63171a3f88f42947a40e Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 23:06:33 +0300 Subject: [PATCH 05/11] [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder --- .../java/org/apache/beam/sdk/io/ReadableFileCoder.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 9f629b415a91..7895db760ee5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -24,20 +24,22 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MetadataCoder; /** A {@link Coder} for {@link FileIO.ReadableFile}. */ public class ReadableFileCoder extends AtomicCoder { private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); - private final AtomicCoder metadataCoder; + private AtomicCoder metadataCoder; /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { - this.metadataCoder = MetadataCoder.of(); + INSTANCE.metadataCoder = MetadataCoder.of(); return INSTANCE; } public static ReadableFileCoder of(AtomicCoder metadataCoder) { - this.metadataCoder = metadataCoder; + INSTANCE.metadataCoder = metadataCoder; return INSTANCE; } From 6cdddfe02600de7e248089013bc8b2c0eb494289 Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 23:32:31 +0300 Subject: [PATCH 06/11] [BEAM-12883] add ability t0 set custom MetadataCoder using StructuredCoder --- .../apache/beam/sdk/io/ReadableFileCoder.java | 62 +++++++++++++------ 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 7895db760ee5..ee6e4a6bd081 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -20,39 +20,63 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.*; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; -/** A {@link Coder} for {@link FileIO.ReadableFile}. */ -public class ReadableFileCoder extends AtomicCoder { - private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); - private AtomicCoder metadataCoder; +/** A {@link Coder} for {@link org.apache.beam.sdk.io.FileIO.ReadableFile}. */ +public class ReadableFileCoder extends StructuredCoder { + + private final Coder metadataCoder; + + public static ReadableFileCoder of(Coder metadataCoder) { + return new ReadableFileCoder(metadataCoder); + } - /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { - INSTANCE.metadataCoder = MetadataCoder.of(); - return INSTANCE; + return new ReadableFileCoder(MetadataCoder.of()); } - public static ReadableFileCoder of(AtomicCoder metadataCoder) { - INSTANCE.metadataCoder = metadataCoder; - return INSTANCE; + public Coder getMetadataCoder() { + return metadataCoder; + } + + private ReadableFileCoder(Coder metadataCoder) { + this.metadataCoder = metadataCoder; } @Override - public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - metadataCoder.encode(value.getMetadata(), os); - VarIntCoder.of().encode(value.getCompression().ordinal(), os); + public void encode( + FileIO.ReadableFile value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws CoderException, IOException { + getMetadataCoder().encode(value.getMetadata(), outStream); + VarIntCoder.of().encode(value.getCompression().ordinal(), outStream); } @Override - public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = metadataCoder.decode(is); - Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + public FileIO.ReadableFile decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws CoderException, IOException { + MatchResult.Metadata metadata = getMetadataCoder().decode(inStream); + Compression compression = Compression.values()[VarIntCoder.of().decode(inStream)]; return new FileIO.ReadableFile(metadata, compression); } + + @Override + public @UnknownKeyFor @NonNull @Initialized List> getCoderArguments() { + return Collections.singletonList(metadataCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + // ignore the default Metadata coder for backward compatible + if (!getMetadataCoder().equals(MetadataCoder.of())) { + verifyDeterministic(this, "Metadata coder must be deterministic", getMetadataCoder()); + } + } } From 6c0906aecd1d864fa4d12ffe05467a63ab0c3ede Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 5 Oct 2021 11:26:06 +0300 Subject: [PATCH 07/11] [BEAM-12883] remove asterisk-based import --- .../main/java/org/apache/beam/sdk/io/ReadableFileCoder.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index ee6e4a6bd081..bdf10c47eef3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -22,7 +22,10 @@ import java.io.OutputStream; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder; From d9f6f004af6856f4645deddfb1a846c5d26465ee Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 10 Oct 2021 11:31:21 +0300 Subject: [PATCH 08/11] [BEAM-12883] add ability for MetadataCoder to support encode-decode for any new field that added to Metadata --- .../beam/sdk/io/fs/MetadataDynamicCoder.java | 97 +++++++++++++++++++ .../sdk/io/fs/MetadataDynamicCoderTest.java | 78 +++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java new file mode 100644 index 000000000000..088fc0f67018 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.transforms.SerializableBiConsumer; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MetadataDynamicCoder extends StructuredCoder { + + private static final MetadataCoder V1_CODER = MetadataCoder.of(); + + private List> coders = new ArrayList<>(); + private List> getters = new ArrayList<>(); + private List> setters = + new ArrayList<>(); + + public MetadataDynamicCoder() {} + + public MetadataDynamicCoder withCoderForField( + Coder coder, + SerializableFunction getter, + SerializableBiConsumer setter) { + coders.add(coder); + getters.add(getter); + setters.add(setter); + return this; + } + + @Override + public void encode(MatchResult.Metadata metadata, OutputStream outStream) + throws CoderException, IOException { + V1_CODER.encode(metadata, outStream); + for (int i = 0; i < coders.size(); i++) { + SerializableFunction getter = getters.get(i); + Coder coder = coders.get(i); + try { + coder.encode(getter.apply(metadata), outStream); + } catch (IOException e) { + throw new RuntimeException( + "Failed to encode " + getter.toString() + " with coder " + coder.getClass()); + } + } + } + + @Override + public MatchResult.Metadata decode(InputStream inStream) throws CoderException, IOException { + MatchResult.Metadata.Builder builder = V1_CODER.decodeBuilder(inStream); + + for (int i = 0; i < coders.size(); i++) { + Coder coder = coders.get(i); + BiConsumer setter = setters.get(i); + + try { + setter.accept(builder, coder.decode(inStream)); + } catch (Exception e) { + throw new RuntimeException("Failed to decode with coder " + coder.getClass()); + } + } + return builder.build(); + } + + @Override + public List> getCoderArguments() { + return coders; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + for (Coder coder : getCoderArguments()) { + verifyDeterministic(this, "Coder must be deterministic " + coder.getClass(), coder); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java new file mode 100644 index 000000000000..40ad943ae289 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.nio.file.Path; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Tests for {@link org.apache.beam.sdk.io.fs.MetadataDynamicCoder}. */ +public class MetadataDynamicCoderTest { + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .build(); + + MetadataDynamicCoder metadataCoderDynamic = getMetadataDynamicCoder(); + + CoderProperties.coderDecodeEncodeEqual(metadataCoderDynamic, metadata); + } + + @Test + public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(1541097000L) + .build(); + MetadataDynamicCoder metadataCoderDynamic = getMetadataDynamicCoder(); + + CoderProperties.coderDecodeEncodeEqual(metadataCoderDynamic, metadata); + } + + private MetadataDynamicCoder getMetadataDynamicCoder() { + return new MetadataDynamicCoder() + .withCoderForField( + VarLongCoder.of(), + Metadata::lastModifiedMillis, + Metadata.Builder::setLastModifiedMillis); + } + + @Test + public void testCoderSerializable() { + CoderProperties.coderSerializable(getMetadataDynamicCoder()); + } +} From 420d4aa7705ac3a5766f08ab3b4247cd6c3330b6 Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 26 Apr 2022 11:14:24 +0300 Subject: [PATCH 09/11] [BEAM-13640][BEAM-12883] extract field coder needed stuff to a diff class (coder, getter, setter) --- .../beam/sdk/io/fs/MetadataDynamicCoder.java | 37 ++++++----- .../io/fs/MetadataFieldCoderDescription.java | 64 +++++++++++++++++++ 2 files changed, 82 insertions(+), 19 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java index 088fc0f67018..d9719994067f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java @@ -23,8 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.SerializableBiConsumer; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -33,10 +33,7 @@ public class MetadataDynamicCoder extends StructuredCoder private static final MetadataCoder V1_CODER = MetadataCoder.of(); - private List> coders = new ArrayList<>(); - private List> getters = new ArrayList<>(); - private List> setters = - new ArrayList<>(); + private List fieldCoders = new ArrayList<>(); public MetadataDynamicCoder() {} @@ -44,35 +41,35 @@ public MetadataDynamicCoder withCoderForField( Coder coder, SerializableFunction getter, SerializableBiConsumer setter) { - coders.add(coder); - getters.add(getter); - setters.add(setter); + MetadataFieldCoderDescription metadataFieldCoderDescription = + new MetadataFieldCoderDescription(coder, getter, setter); + fieldCoders.add(metadataFieldCoderDescription); return this; } @Override - public void encode(MatchResult.Metadata metadata, OutputStream outStream) - throws CoderException, IOException { + public void encode(MatchResult.Metadata metadata, OutputStream outStream) throws IOException { V1_CODER.encode(metadata, outStream); - for (int i = 0; i < coders.size(); i++) { - SerializableFunction getter = getters.get(i); - Coder coder = coders.get(i); + for (MetadataFieldCoderDescription fieldCoderDescription : fieldCoders) { + SerializableFunction getter = + fieldCoderDescription.getGetter(); + Coder coder = fieldCoderDescription.getCoder(); try { coder.encode(getter.apply(metadata), outStream); } catch (IOException e) { throw new RuntimeException( - "Failed to encode " + getter.toString() + " with coder " + coder.getClass()); + "Failed to encode " + getter + " with coder " + coder.getClass()); } } } @Override - public MatchResult.Metadata decode(InputStream inStream) throws CoderException, IOException { + public MatchResult.Metadata decode(InputStream inStream) throws IOException { MatchResult.Metadata.Builder builder = V1_CODER.decodeBuilder(inStream); - for (int i = 0; i < coders.size(); i++) { - Coder coder = coders.get(i); - BiConsumer setter = setters.get(i); + for (MetadataFieldCoderDescription metadataFieldCoderDescription : fieldCoders) { + Coder coder = metadataFieldCoderDescription.getCoder(); + BiConsumer setter = metadataFieldCoderDescription.getSetter(); try { setter.accept(builder, coder.decode(inStream)); @@ -85,7 +82,9 @@ public MatchResult.Metadata decode(InputStream inStream) throws CoderException, @Override public List> getCoderArguments() { - return coders; + return fieldCoders.stream() + .map(MetadataFieldCoderDescription::getCoder) + .collect(Collectors.toList()); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java new file mode 100644 index 000000000000..f71bdf84b9b1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.SerializableBiConsumer; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MetadataFieldCoderDescription { + + private Coder coder; + private SerializableFunction getter; + private SerializableBiConsumer setter; + + public MetadataFieldCoderDescription( + Coder coder, + SerializableFunction getter, + SerializableBiConsumer setter) { + this.coder = coder; + this.getter = getter; + this.setter = setter; + } + + public MetadataFieldCoderDescription() {} + + public Coder getCoder() { + return coder; + } + + public void setCoder(Coder coder) { + this.coder = coder; + } + + public SerializableFunction getGetter() { + return getter; + } + + public void setGetter(SerializableFunction getter) { + this.getter = getter; + } + + public SerializableBiConsumer getSetter() { + return setter; + } + + public void setSetter(SerializableBiConsumer setter) { + this.setter = setter; + } +} From f26c9e88b195ce0b5eadfa12f9d4c74c29c0a458 Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 26 Apr 2022 11:39:49 +0300 Subject: [PATCH 10/11] [BEAM-13640][BEAM-12883] fix --- .../apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java index f71bdf84b9b1..9da1f707cf50 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java @@ -17,11 +17,12 @@ */ package org.apache.beam.sdk.io.fs; +import java.io.Serializable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.SerializableBiConsumer; import org.apache.beam.sdk.transforms.SerializableFunction; -public class MetadataFieldCoderDescription { +public class MetadataFieldCoderDescription implements Serializable { private Coder coder; private SerializableFunction getter; From 35d806bb15f530334ebadb14da914d33a0374551 Mon Sep 17 00:00:00 2001 From: brachipa Date: Fri, 13 May 2022 17:07:12 +0300 Subject: [PATCH 11/11] [BEAM-13640][BEAM-12883] fix error `initialization.fields.uninitialized` --- .../apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java index 9da1f707cf50..09c5f0b3d250 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java @@ -37,8 +37,6 @@ public MetadataFieldCoderDescription( this.setter = setter; } - public MetadataFieldCoderDescription() {} - public Coder getCoder() { return coder; }