From 077a266956bb96c06d3dcfebeac1ee83fb81701b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 30 Oct 2022 18:09:19 +0800 Subject: [PATCH 1/3] HDDS-7438. Add a createStreamKey method to OzoneBucket. --- .../hadoop/ozone/client/OzoneBucket.java | 32 ++++++--- .../rpc/TestOzoneRpcClientWithRatis.java | 70 +++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index d171a43f8a64..b0a8e965c4bb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -600,6 +601,21 @@ public OzoneOutputStream createKey(String key, long size, .createKey(volumeName, name, key, size, replicationConfig, keyMetadata); } + /** + * Creates a new key in the bucket, with default replication type RATIS and + * with replication factor THREE. + * + * @param key Name of the key to be created. + * @param size Size of the data the key will point to. + * @return OzoneOutputStream to which the data has to be written. + * @throws IOException + */ + public OzoneDataStreamOutput createStreamKey(String key, long size) + throws IOException { + return createStreamKey(key, size, defaultReplication, + Collections.emptyMap()); + } + /** * Creates a new key in the bucket. * @@ -610,12 +626,13 @@ public OzoneOutputStream createKey(String key, long size, * @throws IOException */ public OzoneDataStreamOutput createStreamKey(String key, long size, - ReplicationConfig replicationConfig, - Map keyMetadata) + ReplicationConfig replicationConfig, Map keyMetadata) throws IOException { - return proxy - .createStreamKey(volumeName, name, key, size, replicationConfig, - keyMetadata); + if (replicationConfig == null) { + replicationConfig = defaultReplication; + } + return proxy.createStreamKey(volumeName, name, key, size, + replicationConfig, keyMetadata); } /** @@ -958,9 +975,8 @@ public OzoneOutputStream createFile(String keyName, long size, public OzoneDataStreamOutput createStreamFile(String keyName, long size, ReplicationConfig replicationConfig, boolean overWrite, boolean recursive) throws IOException { - return proxy - .createStreamFile(volumeName, name, keyName, size, replicationConfig, - overWrite, recursive); + return proxy.createStreamFile(volumeName, name, keyName, size, + replicationConfig, overWrite, recursive); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 65551bc4e932..7104666cf975 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -18,11 +18,16 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.HashMap; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; @@ -33,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; @@ -42,8 +48,10 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -208,4 +216,66 @@ public void testMultiPartUploadWithStream() throws IOException { Assert.assertEquals(valueLength, partInfo.getSize()); } + + @Test + public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { + // create a local dir + final String dir = GenericTestUtils.getTempPath( + getClass().getSimpleName()); + GenericTestUtils.assertDirCreation(new File(dir)); + + // create a local file + final int chunkSize = 1024; + final byte[] data = new byte[8 * chunkSize]; + ThreadLocalRandom.current().nextBytes(data); + final File file = new File(dir, "data"); + try(FileOutputStream out = new FileOutputStream(file)) { + out.write(data); + } + + // create a volume + final String volumeName = "vol-" + UUID.randomUUID(); + getStore().createVolume(volumeName); + final OzoneVolume volume = getStore().getVolume(volumeName); + + // create a bucket + final String bucketName = "buck-" + UUID.randomUUID(); + volume.createBucket(bucketName); + final OzoneBucket bucket = volume.getBucket(bucketName); + + // upload a key from the local file using memory-mapped buffers + final String keyName = "key-" + UUID.randomUUID(); + try (RandomAccessFile raf = new RandomAccessFile(file, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey( + keyName, data.length)) { + final FileChannel channel = raf.getChannel(); + long off = 0; + for (long len = raf.length(); len > 0; ) { + final long writeLen = Math.min(len, chunkSize); + final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY, + off, writeLen); + out.write(mapped); + off += writeLen; + len -= writeLen; + } + } + + // verify the key details + final OzoneKeyDetails keyDetails = bucket.getKey(keyName); + Assertions.assertEquals(keyName, keyDetails.getName()); + Assertions.assertEquals(data.length, keyDetails.getDataSize()); + + // verify the key content + final byte[] buffer = new byte[data.length]; + try (OzoneInputStream in = keyDetails.getContent()) { + for(int off = 0; off < data.length; ) { + final int n = in.read(buffer, off, data.length - off); + if (n < 0) { + break; + } + off += n; + } + } + Assertions.assertArrayEquals(data, buffer); + } } From 1afae5ab1240a6d23418e30680a4733fe878de76 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 30 Oct 2022 19:55:07 +0800 Subject: [PATCH 2/3] Fix indentations. --- .../ozone/client/rpc/TestOzoneRpcClientWithRatis.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 7104666cf975..c75bdb388af1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -229,7 +229,7 @@ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { final byte[] data = new byte[8 * chunkSize]; ThreadLocalRandom.current().nextBytes(data); final File file = new File(dir, "data"); - try(FileOutputStream out = new FileOutputStream(file)) { + try (FileOutputStream out = new FileOutputStream(file)) { out.write(data); } @@ -246,11 +246,11 @@ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { // upload a key from the local file using memory-mapped buffers final String keyName = "key-" + UUID.randomUUID(); try (RandomAccessFile raf = new RandomAccessFile(file, "r"); - OzoneDataStreamOutput out = bucket.createStreamKey( - keyName, data.length)) { + OzoneDataStreamOutput out = bucket.createStreamKey( + keyName, data.length)) { final FileChannel channel = raf.getChannel(); long off = 0; - for (long len = raf.length(); len > 0; ) { + for (long len = raf.length(); len > 0;) { final long writeLen = Math.min(len, chunkSize); final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY, off, writeLen); @@ -268,7 +268,7 @@ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { // verify the key content final byte[] buffer = new byte[data.length]; try (OzoneInputStream in = keyDetails.getContent()) { - for(int off = 0; off < data.length; ) { + for (int off = 0; off < data.length;) { final int n = in.read(buffer, off, data.length - off); if (n < 0) { break; From ae3961152fb447c106f8f026a22c444823be8ff9 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 30 Oct 2022 20:04:19 +0800 Subject: [PATCH 3/3] Set bucket DefaultReplicationConfig. --- .../ozone/client/rpc/TestOzoneRpcClientWithRatis.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index c75bdb388af1..ca26ca177cc6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -29,11 +29,13 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -240,7 +242,11 @@ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { // create a bucket final String bucketName = "buck-" + UUID.randomUUID(); - volume.createBucket(bucketName); + final BucketArgs bucketArgs = BucketArgs.newBuilder() + .setDefaultReplicationConfig( + new DefaultReplicationConfig(ReplicationType.RATIS, THREE)) + .build(); + volume.createBucket(bucketName, bucketArgs); final OzoneBucket bucket = volume.getBucket(bucketName); // upload a key from the local file using memory-mapped buffers