From d2a6e4dab909cc8427dab0a2d1aaa2039f71581f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 10 Nov 2022 11:34:20 -0800 Subject: [PATCH 1/3] HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. --- .../TestOzoneFileSystemWithStreaming.java | 145 ++++++++++++++++++ .../fs/ozone/BasicOzoneClientAdapterImpl.java | 21 +-- 2 files changed, 152 insertions(+), 14 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java new file mode 100644 index 000000000000..27318611b3ae --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.fail; + +/** + * Ozone file system tests with Streaming. + */ +public class TestOzoneFileSystemWithStreaming { + @Rule + public Timeout timeout = Timeout.seconds(300); + + private static final Logger LOG + = LoggerFactory.getLogger(TestOzoneFileSystemWithStreaming.class); + + private static final BucketLayout BUCKET_LAYOUT + = BucketLayout.FILE_SYSTEM_OPTIMIZED; + + private static MiniOzoneCluster cluster; + private static FileSystem fs; + + { + try { + init(); + } catch (Exception e) { + LOG.info("Unexpected exception", e); + fail("Unexpected exception:" + e.getMessage()); + } + } + + private void init() throws Exception { + final int chunkSize = 16 << 10; + final int flushSize = 2 * chunkSize; + final int maxFlushSize = 2 * flushSize; + final int blockSize = 2 * maxFlushSize; + + final OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, + true); + conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, false); + conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, + BUCKET_LAYOUT.name()); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(5) + .setTotalPipelineNumLimit(10) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setDataStreamBufferFlushize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .setDataStreamMinPacketSize(chunkSize) + .setDataStreamStreamWindowSize(5 * chunkSize) + .build(); + cluster.waitForClusterToBeReady(); + + // create a volume and a bucket to be used by OzoneFileSystem + final OzoneBucket bucket = TestDataUtil.createVolumeAndBucket( + cluster, BUCKET_LAYOUT); + + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + fs = FileSystem.get(conf); + } + + @AfterClass + public static void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.closeQuietly(fs); + } + + @Test + public void testCreateFile() throws IOException { + final byte[] bytes = new byte[1 << 20]; + ThreadLocalRandom.current().nextBytes(bytes); + + final Path file = new Path("/file"); + ContractTestUtils.createFile(fs, file, true, bytes); + + final byte[] buffer = new byte[4 << 10]; + int offset = 0; + try (FSDataInputStream in = fs.open(file)) { + for (; ;) { + final int n = in.read(buffer, 0, buffer.length); + if (n <= 0) { + break; + } + for (int i = 0; i < n; i++) { + Assertions.assertEquals(bytes[offset + i], buffer[i]); + } + offset += n; + } + } + Assertions.assertEquals(bytes.length, offset); + } +} diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index 0c21ab6ec017..dafc6a55157a 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -272,20 +272,13 @@ public OzoneFSDataStreamOutput createStreamFile(String key, short replication, boolean overWrite, boolean recursive) throws IOException { incrementCounter(Statistic.OBJECTS_CREATED, 1); try { - OzoneDataStreamOutput ozoneDataStreamOutput = null; - if (replication == ReplicationFactor.ONE.getValue() - || replication == ReplicationFactor.THREE.getValue()) { - - ReplicationConfig customReplicationConfig = - ReplicationConfig.adjustReplication(bucketReplicationConfig, - replication, config); - ozoneDataStreamOutput = bucket - .createStreamFile(key, 0, customReplicationConfig, overWrite, - recursive); - } else { - ozoneDataStreamOutput = bucket.createStreamFile( - key, 0, bucketReplicationConfig, overWrite, recursive); - } + final ReplicationConfig replicationConfig + = OzoneClientUtils.resolveClientSideReplicationConfig( + replication, clientConfiguredReplicationConfig, + getReplicationConfigWithRefreshCheck(), config); + final OzoneDataStreamOutput ozoneDataStreamOutput + = bucket.createStreamFile(key, 0, replicationConfig, + overWrite, recursive); return new OzoneFSDataStreamOutput( ozoneDataStreamOutput.getByteBufStreamOutput()); } catch (OMException ex) { From c7b48f51ac7ff4b5308572f10418998cd3db2dd4 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 10 Nov 2022 23:10:14 -0800 Subject: [PATCH 2/3] Fix also BasicRootedOzoneClientAdapterImpl. --- .../TestOzoneFileSystemWithStreaming.java | 84 +++++++++++-------- .../fs/ozone/BasicOzoneClientAdapterImpl.java | 8 +- .../BasicRootedOzoneClientAdapterImpl.java | 23 ++--- 3 files changed, 59 insertions(+), 56 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java index 27318611b3ae..1e3a40978f6c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.ozone; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; @@ -27,24 +26,26 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; import org.junit.jupiter.api.Assertions; import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; -import static org.junit.Assert.fail; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; /** * Ozone file system tests with Streaming. @@ -53,21 +54,15 @@ public class TestOzoneFileSystemWithStreaming { @Rule public Timeout timeout = Timeout.seconds(300); - private static final Logger LOG - = LoggerFactory.getLogger(TestOzoneFileSystemWithStreaming.class); - - private static final BucketLayout BUCKET_LAYOUT - = BucketLayout.FILE_SYSTEM_OPTIMIZED; - + private static final OzoneConfiguration conf = new OzoneConfiguration(); private static MiniOzoneCluster cluster; - private static FileSystem fs; + private static OzoneBucket bucket; { try { init(); } catch (Exception e) { - LOG.info("Unexpected exception", e); - fail("Unexpected exception:" + e.getMessage()); + throw new IllegalStateException(e); } } @@ -76,14 +71,12 @@ private void init() throws Exception { final int flushSize = 2 * chunkSize; final int maxFlushSize = 2 * flushSize; final int blockSize = 2 * maxFlushSize; + final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED; - final OzoneConfiguration conf = new OzoneConfiguration(); - conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, - true); - conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true); - conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, false); - conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, - BUCKET_LAYOUT.name()); + conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true); + conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true); + conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name()); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5) .setTotalPipelineNumLimit(10) @@ -99,15 +92,7 @@ private void init() throws Exception { cluster.waitForClusterToBeReady(); // create a volume and a bucket to be used by OzoneFileSystem - final OzoneBucket bucket = TestDataUtil.createVolumeAndBucket( - cluster, BUCKET_LAYOUT); - - // Set the fs.defaultFS - final String rootPath = String.format("%s://%s.%s/", - OzoneConsts.OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); - conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - - fs = FileSystem.get(conf); + bucket = TestDataUtil.createVolumeAndBucket(cluster, layout); } @AfterClass @@ -115,15 +100,42 @@ public static void teardown() { if (cluster != null) { cluster.shutdown(); } - IOUtils.closeQuietly(fs); } @Test - public void testCreateFile() throws IOException { + public void testO3fsCreateFile() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final Path file = new Path("/file"); + + try(FileSystem fs = FileSystem.get(conf)) { + runTestCreateFile(fs, file); + } + } + + @Test + public void testOfsCreateFile() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + final Path file = new Path(dir, "file"); + + try(FileSystem fs = FileSystem.get(conf)) { + runTestCreateFile(fs, file); + } + } + + static void runTestCreateFile(FileSystem fs, Path file) throws Exception { final byte[] bytes = new byte[1 << 20]; ThreadLocalRandom.current().nextBytes(bytes); - final Path file = new Path("/file"); ContractTestUtils.createFile(fs, file, true, bytes); final byte[] buffer = new byte[4 << 10]; diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index dafc6a55157a..65607aae5c39 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -276,11 +276,9 @@ public OzoneFSDataStreamOutput createStreamFile(String key, short replication, = OzoneClientUtils.resolveClientSideReplicationConfig( replication, clientConfiguredReplicationConfig, getReplicationConfigWithRefreshCheck(), config); - final OzoneDataStreamOutput ozoneDataStreamOutput - = bucket.createStreamFile(key, 0, replicationConfig, - overWrite, recursive); - return new OzoneFSDataStreamOutput( - ozoneDataStreamOutput.getByteBufStreamOutput()); + final OzoneDataStreamOutput out = bucket.createStreamFile( + key, 0, replicationConfig, overWrite, recursive); + return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput()); } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index 5d51cec7e9c2..843bcd8119d7 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -406,21 +406,14 @@ public OzoneFSDataStreamOutput createStreamFile(String pathStr, String key = ofsPath.getKeyName(); try { // Hadoop CopyCommands class always sets recursive to true - OzoneBucket bucket = getBucket(ofsPath, recursive); - OzoneDataStreamOutput ozoneDataStreamOutput = null; - if (replication == ReplicationFactor.ONE.getValue() - || replication == ReplicationFactor.THREE.getValue()) { - - ozoneDataStreamOutput = bucket.createStreamFile(key, 0, - ReplicationConfig.adjustReplication( - clientConfiguredReplicationConfig, replication, config), - overWrite, recursive); - } else { - ozoneDataStreamOutput = bucket.createStreamFile( - key, 0, clientConfiguredReplicationConfig, overWrite, recursive); - } - return new OzoneFSDataStreamOutput( - ozoneDataStreamOutput.getByteBufStreamOutput()); + final OzoneBucket bucket = getBucket(ofsPath, recursive); + final ReplicationConfig replicationConfig + = OzoneClientUtils.resolveClientSideReplicationConfig( + replication, clientConfiguredReplicationConfig, + bucket.getReplicationConfig(), config); + final OzoneDataStreamOutput out = bucket.createStreamFile( + key, 0, replicationConfig, overWrite, recursive); + return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput()); } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { From bf069f307ac735fe221fe8ed050260f2c25cd2a3 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 10 Nov 2022 23:35:05 -0800 Subject: [PATCH 3/3] Fix checkstyle. --- .../hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java index 1e3a40978f6c..f2aa52759833 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java @@ -54,10 +54,11 @@ public class TestOzoneFileSystemWithStreaming { @Rule public Timeout timeout = Timeout.seconds(300); - private static final OzoneConfiguration conf = new OzoneConfiguration(); private static MiniOzoneCluster cluster; private static OzoneBucket bucket; + private final OzoneConfiguration conf = new OzoneConfiguration(); + { try { init(); @@ -111,7 +112,7 @@ public void testO3fsCreateFile() throws Exception { final Path file = new Path("/file"); - try(FileSystem fs = FileSystem.get(conf)) { + try (FileSystem fs = FileSystem.get(conf)) { runTestCreateFile(fs, file); } } @@ -127,7 +128,7 @@ public void testOfsCreateFile() throws Exception { + OZONE_URI_DELIMITER + bucket.getName(); final Path file = new Path(dir, "file"); - try(FileSystem fs = FileSystem.get(conf)) { + try (FileSystem fs = FileSystem.get(conf)) { runTestCreateFile(fs, file); } }