From d2e5106045ea18db91dae7b30893f06a71f388da Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:12:10 +0300 Subject: [PATCH 1/7] add coder readable files --- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java new file mode 100644 index 000000000000..824630b045b1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MetadataCoderV2; + +/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +public class ReadableFileCoder extends AtomicCoder { + private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); + + /** Returns the instance of {@link ReadableFileCoder}. */ + public static ReadableFileCoder of() { + return INSTANCE; + } + + @Override + public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { + MetadataCoderV2.of().encode(value.getMetadata(), os); + VarIntCoder.of().encode(value.getCompression().ordinal(), os); + } + + @Override + public FileIO.ReadableFile decode(InputStream is) throws IOException { + MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); + Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + return new FileIO.ReadableFile(metadata, compression); + } +} From 058400d4d954c3b9ce615edced06629926559a8a Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:12:10 +0300 Subject: [PATCH 2/7] add coder readable files --- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java new file mode 100644 index 000000000000..db8258166f14 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MetadataCoderV2; + +/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +public class ReadableFileCoderV2 extends AtomicCoder { + private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); + + /** Returns the instance of {@link ReadableFileCoder}. */ + public static ReadableFileCoderV2 of() { + return INSTANCE; + } + + @Override + public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { + MetadataCoderV2.of().encode(value.getMetadata(), os); + VarIntCoder.of().encode(value.getCompression().ordinal(), os); + } + + @Override + public FileIO.ReadableFile decode(InputStream is) throws IOException { + MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); + Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + return new FileIO.ReadableFile(metadata, compression); + } +} From 4dec390c48ffb621f2d09d385771b13a9eaa8ed1 Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 14 Sep 2021 23:50:10 +0300 Subject: [PATCH 3/7] :sdks:java:core:spotlessApply --- .../java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java index db8258166f14..36e8ecbc0385 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java @@ -26,7 +26,10 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MetadataCoderV2; -/** A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link Metadata#lastModifiedMillis()}. */ +/** + * A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link + * Metadata#lastModifiedMillis()}. + */ public class ReadableFileCoderV2 extends AtomicCoder { private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); From a224a66e76c49ead497b953dceab5899fa1df309 Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 22:57:51 +0300 Subject: [PATCH 4/7] [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder --- .../apache/beam/sdk/io/ReadableFileCoder.java | 12 +++-- .../beam/sdk/io/fs/ReadableFileCoderV2.java | 53 ------------------- 2 files changed, 9 insertions(+), 56 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 51bb83ea9cc0..9f629b415a91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -24,26 +24,32 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MetadataCoder; /** A {@link Coder} for {@link FileIO.ReadableFile}. */ public class ReadableFileCoder extends AtomicCoder { private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); + private final AtomicCoder metadataCoder; /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { + this.metadataCoder = MetadataCoder.of(); + return INSTANCE; + } + + public static ReadableFileCoder of(AtomicCoder metadataCoder) { + this.metadataCoder = metadataCoder; return INSTANCE; } @Override public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - MetadataCoder.of().encode(value.getMetadata(), os); + metadataCoder.encode(value.getMetadata(), os); VarIntCoder.of().encode(value.getCompression().ordinal(), os); } @Override public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = MetadataCoder.of().decode(is); + MatchResult.Metadata metadata = metadataCoder.decode(is); Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; return new FileIO.ReadableFile(metadata, compression); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java deleted file mode 100644 index 36e8ecbc0385..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ReadableFileCoderV2.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MetadataCoderV2; - -/** - * A {@link Coder} for {@link FileIO.ReadableFile} that includes {@link - * Metadata#lastModifiedMillis()}. - */ -public class ReadableFileCoderV2 extends AtomicCoder { - private static final ReadableFileCoderV2 INSTANCE = new ReadableFileCoderV2(); - - /** Returns the instance of {@link ReadableFileCoder}. */ - public static ReadableFileCoderV2 of() { - return INSTANCE; - } - - @Override - public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - MetadataCoderV2.of().encode(value.getMetadata(), os); - VarIntCoder.of().encode(value.getCompression().ordinal(), os); - } - - @Override - public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is); - Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; - return new FileIO.ReadableFile(metadata, compression); - } -} From 1e3e452d9fc2879e620a63171a3f88f42947a40e Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 23:06:33 +0300 Subject: [PATCH 5/7] [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder --- .../java/org/apache/beam/sdk/io/ReadableFileCoder.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 9f629b415a91..7895db760ee5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -24,20 +24,22 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MetadataCoder; /** A {@link Coder} for {@link FileIO.ReadableFile}. */ public class ReadableFileCoder extends AtomicCoder { private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); - private final AtomicCoder metadataCoder; + private AtomicCoder metadataCoder; /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { - this.metadataCoder = MetadataCoder.of(); + INSTANCE.metadataCoder = MetadataCoder.of(); return INSTANCE; } public static ReadableFileCoder of(AtomicCoder metadataCoder) { - this.metadataCoder = metadataCoder; + INSTANCE.metadataCoder = metadataCoder; return INSTANCE; } From 6cdddfe02600de7e248089013bc8b2c0eb494289 Mon Sep 17 00:00:00 2001 From: brachipa Date: Sun, 3 Oct 2021 23:32:31 +0300 Subject: [PATCH 6/7] [BEAM-12883] add ability t0 set custom MetadataCoder using StructuredCoder --- .../apache/beam/sdk/io/ReadableFileCoder.java | 62 +++++++++++++------ 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 7895db760ee5..ee6e4a6bd081 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -20,39 +20,63 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.*; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; -/** A {@link Coder} for {@link FileIO.ReadableFile}. */ -public class ReadableFileCoder extends AtomicCoder { - private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); - private AtomicCoder metadataCoder; +/** A {@link Coder} for {@link org.apache.beam.sdk.io.FileIO.ReadableFile}. */ +public class ReadableFileCoder extends StructuredCoder { + + private final Coder metadataCoder; + + public static ReadableFileCoder of(Coder metadataCoder) { + return new ReadableFileCoder(metadataCoder); + } - /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { - INSTANCE.metadataCoder = MetadataCoder.of(); - return INSTANCE; + return new ReadableFileCoder(MetadataCoder.of()); } - public static ReadableFileCoder of(AtomicCoder metadataCoder) { - INSTANCE.metadataCoder = metadataCoder; - return INSTANCE; + public Coder getMetadataCoder() { + return metadataCoder; + } + + private ReadableFileCoder(Coder metadataCoder) { + this.metadataCoder = metadataCoder; } @Override - public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - metadataCoder.encode(value.getMetadata(), os); - VarIntCoder.of().encode(value.getCompression().ordinal(), os); + public void encode( + FileIO.ReadableFile value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws CoderException, IOException { + getMetadataCoder().encode(value.getMetadata(), outStream); + VarIntCoder.of().encode(value.getCompression().ordinal(), outStream); } @Override - public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = metadataCoder.decode(is); - Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + public FileIO.ReadableFile decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws CoderException, IOException { + MatchResult.Metadata metadata = getMetadataCoder().decode(inStream); + Compression compression = Compression.values()[VarIntCoder.of().decode(inStream)]; return new FileIO.ReadableFile(metadata, compression); } + + @Override + public @UnknownKeyFor @NonNull @Initialized List> getCoderArguments() { + return Collections.singletonList(metadataCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + // ignore the default Metadata coder for backward compatible + if (!getMetadataCoder().equals(MetadataCoder.of())) { + verifyDeterministic(this, "Metadata coder must be deterministic", getMetadataCoder()); + } + } } From 6c0906aecd1d864fa4d12ffe05467a63ab0c3ede Mon Sep 17 00:00:00 2001 From: brachipa Date: Tue, 5 Oct 2021 11:26:06 +0300 Subject: [PATCH 7/7] [BEAM-12883] remove asterisk-based import --- .../main/java/org/apache/beam/sdk/io/ReadableFileCoder.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index ee6e4a6bd081..bdf10c47eef3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -22,7 +22,10 @@ import java.io.OutputStream; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder;