diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java new file mode 100644 index 000000000000..d9719994067f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoder.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.transforms.SerializableBiConsumer; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MetadataDynamicCoder extends StructuredCoder { + + private static final MetadataCoder V1_CODER = MetadataCoder.of(); + + private List fieldCoders = new ArrayList<>(); + + public MetadataDynamicCoder() {} + + public MetadataDynamicCoder withCoderForField( + Coder coder, + SerializableFunction getter, + SerializableBiConsumer setter) { + MetadataFieldCoderDescription metadataFieldCoderDescription = + new MetadataFieldCoderDescription(coder, getter, setter); + fieldCoders.add(metadataFieldCoderDescription); + return this; + } + + @Override + public void encode(MatchResult.Metadata metadata, OutputStream outStream) throws IOException { + V1_CODER.encode(metadata, outStream); + for (MetadataFieldCoderDescription fieldCoderDescription : fieldCoders) { + SerializableFunction getter = + fieldCoderDescription.getGetter(); + Coder coder = fieldCoderDescription.getCoder(); + try { + coder.encode(getter.apply(metadata), outStream); + } catch (IOException e) { + throw new RuntimeException( + "Failed to encode " + getter + " with coder " + coder.getClass()); + } + } + } + + @Override + public MatchResult.Metadata decode(InputStream inStream) throws IOException { + MatchResult.Metadata.Builder builder = V1_CODER.decodeBuilder(inStream); + + for (MetadataFieldCoderDescription metadataFieldCoderDescription : fieldCoders) { + Coder coder = metadataFieldCoderDescription.getCoder(); + BiConsumer setter = metadataFieldCoderDescription.getSetter(); + + try { + setter.accept(builder, coder.decode(inStream)); + } catch (Exception e) { + throw new RuntimeException("Failed to decode with coder " + coder.getClass()); + } + } + return builder.build(); + } + + @Override + public List> getCoderArguments() { + return fieldCoders.stream() + .map(MetadataFieldCoderDescription::getCoder) + .collect(Collectors.toList()); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + for (Coder coder : getCoderArguments()) { + verifyDeterministic(this, "Coder must be deterministic " + coder.getClass(), coder); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java new file mode 100644 index 000000000000..09c5f0b3d250 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataFieldCoderDescription.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.SerializableBiConsumer; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MetadataFieldCoderDescription implements Serializable { + + private Coder coder; + private SerializableFunction getter; + private SerializableBiConsumer setter; + + public MetadataFieldCoderDescription( + Coder coder, + SerializableFunction getter, + SerializableBiConsumer setter) { + this.coder = coder; + this.getter = getter; + this.setter = setter; + } + + public Coder getCoder() { + return coder; + } + + public void setCoder(Coder coder) { + this.coder = coder; + } + + public SerializableFunction getGetter() { + return getter; + } + + public void setGetter(SerializableFunction getter) { + this.getter = getter; + } + + public SerializableBiConsumer getSetter() { + return setter; + } + + public void setSetter(SerializableBiConsumer setter) { + this.setter = setter; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java new file mode 100644 index 000000000000..40ad943ae289 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataDynamicCoderTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.nio.file.Path; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Tests for {@link org.apache.beam.sdk.io.fs.MetadataDynamicCoder}. */ +public class MetadataDynamicCoderTest { + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .build(); + + MetadataDynamicCoder metadataCoderDynamic = getMetadataDynamicCoder(); + + CoderProperties.coderDecodeEncodeEqual(metadataCoderDynamic, metadata); + } + + @Test + public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(1541097000L) + .build(); + MetadataDynamicCoder metadataCoderDynamic = getMetadataDynamicCoder(); + + CoderProperties.coderDecodeEncodeEqual(metadataCoderDynamic, metadata); + } + + private MetadataDynamicCoder getMetadataDynamicCoder() { + return new MetadataDynamicCoder() + .withCoderForField( + VarLongCoder.of(), + Metadata::lastModifiedMillis, + Metadata.Builder::setLastModifiedMillis); + } + + @Test + public void testCoderSerializable() { + CoderProperties.coderSerializable(getMetadataDynamicCoder()); + } +}