diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java index a87b6a629c63..a02de003ccc8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java @@ -153,7 +153,7 @@ private void verifyFreonCommand(ParameterBuilder parameterBuilder) "-r", String.valueOf(parameterBuilder.fileCountForRead), "-w", String.valueOf(parameterBuilder.fileCountForWrite), "-g", String.valueOf(parameterBuilder.fileSizeInBytes), - "-b", String.valueOf(parameterBuilder.bufferSize), + "--buffer", String.valueOf(parameterBuilder.bufferSize), "-l", String.valueOf(parameterBuilder.length), "-c", String.valueOf(parameterBuilder.totalThreadCount), "-T", String.valueOf(parameterBuilder.readThreadPercentage), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteKeyOps.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteKeyOps.java new file mode 100644 index 000000000000..bb3370396a6d --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteKeyOps.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; + +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.lock.OMLockMetrics; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.raftlog.RaftLog; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Iterator; + +/** + * Test for OmBucketReadWriteKeyOps. + */ +public class TestOmBucketReadWriteKeyOps { + + // TODO: Remove code duplication of TestOmBucketReadWriteKeyOps with + // TestOmBucketReadWriteFileOps. + + private String path; + private OzoneConfiguration conf = null; + private MiniOzoneCluster cluster = null; + private ObjectStore store = null; + private static final Logger LOG = + LoggerFactory.getLogger(TestOmBucketReadWriteKeyOps.class); + + @Before + public void setup() { + path = GenericTestUtils + .getTempPath(TestHadoopDirTreeGenerator.class.getSimpleName()); + GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + File baseDir = new File(path); + baseDir.mkdirs(); + } + + /** + * Shutdown MiniDFSCluster. + */ + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + private void startCluster() throws Exception { + conf = getOzoneConfiguration(); + conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, + BucketLayout.OBJECT_STORE.name()); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + + store = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + } + + private OzoneConfiguration getOzoneConfiguration() { + return new OzoneConfiguration(); + } + + @Test + public void testOmBucketReadWriteKeyOps() throws Exception { + try { + startCluster(); + FileOutputStream out = FileUtils.openOutputStream(new File(path, + "conf")); + cluster.getConf().writeXml(out); + out.getFD().sync(); + out.close(); + + verifyFreonCommand(new ParameterBuilder().setTotalThreadCount(10) + .setNumOfReadOperations(10).setNumOfWriteOperations(5) + .setKeyCountForRead(10).setKeyCountForWrite(5)); + verifyFreonCommand( + new ParameterBuilder().setVolumeName("vol2").setBucketName("bucket1") + .setTotalThreadCount(10).setNumOfReadOperations(10) + .setNumOfWriteOperations(5).setKeyCountForRead(10) + .setKeyCountForWrite(5)); + verifyFreonCommand( + new ParameterBuilder().setVolumeName("vol3").setBucketName("bucket1") + .setTotalThreadCount(15).setNumOfReadOperations(5) + .setNumOfWriteOperations(3).setKeyCountForRead(5) + .setKeyCountForWrite(3)); + verifyFreonCommand( + new ParameterBuilder().setVolumeName("vol4").setBucketName("bucket1") + .setTotalThreadCount(10).setNumOfReadOperations(5) + .setNumOfWriteOperations(3).setKeyCountForRead(5) + .setKeyCountForWrite(3).setKeySizeInBytes(64) + .setBufferSize(16)); + verifyFreonCommand( + new ParameterBuilder().setVolumeName("vol5").setBucketName("bucket1") + .setTotalThreadCount(10).setNumOfReadOperations(5) + .setNumOfWriteOperations(0).setKeyCountForRead(5)); + verifyFreonCommand( + new ParameterBuilder().setVolumeName("vol6").setBucketName("bucket1") + .setTotalThreadCount(20).setNumOfReadOperations(0) + .setNumOfWriteOperations(5).setKeyCountForRead(0) + .setKeyCountForWrite(5)); + + } finally { + shutdown(); + } + } + + private void verifyFreonCommand(ParameterBuilder parameterBuilder) + throws IOException { + store.createVolume(parameterBuilder.volumeName); + OzoneVolume volume = store.getVolume(parameterBuilder.volumeName); + volume.createBucket(parameterBuilder.bucketName); + OzoneBucket bucket = volume.getBucket(parameterBuilder.bucketName); + String confPath = new File(path, "conf").getAbsolutePath(); + + long startTime = System.currentTimeMillis(); + new Freon().execute( + new String[]{"-conf", confPath, "obrwk", + "-v", parameterBuilder.volumeName, + "-b", parameterBuilder.bucketName, + "-k", String.valueOf(parameterBuilder.keyCountForRead), + "-w", String.valueOf(parameterBuilder.keyCountForWrite), + "-g", String.valueOf(parameterBuilder.keySizeInBytes), + "--buffer", String.valueOf(parameterBuilder.bufferSize), + "-l", String.valueOf(parameterBuilder.length), + "-c", String.valueOf(parameterBuilder.totalThreadCount), + "-T", String.valueOf(parameterBuilder.readThreadPercentage), + "-R", String.valueOf(parameterBuilder.numOfReadOperations), + "-W", String.valueOf(parameterBuilder.numOfWriteOperations), + "-n", String.valueOf(1)}); + long totalTime = System.currentTimeMillis() - startTime; + LOG.info("Total Execution Time: " + totalTime); + + LOG.info("Started verifying OM bucket read/write ops key generation..."); + verifyKeyCreation(parameterBuilder.keyCountForRead, bucket, "/readPath/"); + + int readThreadCount = (parameterBuilder.readThreadPercentage * + parameterBuilder.totalThreadCount) / 100; + int writeThreadCount = parameterBuilder.totalThreadCount - readThreadCount; + verifyKeyCreation(writeThreadCount * parameterBuilder.keyCountForWrite * + parameterBuilder.numOfWriteOperations, bucket, "/writePath/"); + + verifyOMLockMetrics(cluster.getOzoneManager().getMetadataManager().getLock() + .getOMLockMetrics()); + } + + private void verifyKeyCreation(int expectedCount, OzoneBucket bucket, + String keyPrefix) throws IOException { + int actual = 0; + Iterator ozoneKeyIterator = bucket.listKeys(keyPrefix); + while (ozoneKeyIterator.hasNext()) { + ozoneKeyIterator.next(); + ++actual; + } + Assert.assertEquals("Mismatch Count!", expectedCount, actual); + } + + private void verifyOMLockMetrics(OMLockMetrics omLockMetrics) { + String readLockWaitingTimeMsStat = + omLockMetrics.getReadLockWaitingTimeMsStat(); + LOG.info("Read Lock Waiting Time Stat: " + readLockWaitingTimeMsStat); + LOG.info("Longest Read Lock Waiting Time (ms): " + + omLockMetrics.getLongestReadLockWaitingTimeMs()); + int readWaitingSamples = + Integer.parseInt(readLockWaitingTimeMsStat.split(" ")[2]); + Assert.assertTrue("Read Lock Waiting Samples should be positive", + readWaitingSamples > 0); + + String readLockHeldTimeMsStat = omLockMetrics.getReadLockHeldTimeMsStat(); + LOG.info("Read Lock Held Time Stat: " + readLockHeldTimeMsStat); + LOG.info("Longest Read Lock Held Time (ms): " + + omLockMetrics.getLongestReadLockHeldTimeMs()); + int readHeldSamples = + Integer.parseInt(readLockHeldTimeMsStat.split(" ")[2]); + Assert.assertTrue("Read Lock Held Samples should be positive", + readHeldSamples > 0); + + String writeLockWaitingTimeMsStat = + omLockMetrics.getWriteLockWaitingTimeMsStat(); + LOG.info("Write Lock Waiting Time Stat: " + writeLockWaitingTimeMsStat); + LOG.info("Longest Write Lock Waiting Time (ms): " + + omLockMetrics.getLongestWriteLockWaitingTimeMs()); + int writeWaitingSamples = + Integer.parseInt(writeLockWaitingTimeMsStat.split(" ")[2]); + Assert.assertTrue("Write Lock Waiting Samples should be positive", + writeWaitingSamples > 0); + + String writeLockHeldTimeMsStat = omLockMetrics.getWriteLockHeldTimeMsStat(); + LOG.info("Write Lock Held Time Stat: " + writeLockHeldTimeMsStat); + LOG.info("Longest Write Lock Held Time (ms): " + + omLockMetrics.getLongestWriteLockHeldTimeMs()); + int writeHeldSamples = + Integer.parseInt(writeLockHeldTimeMsStat.split(" ")[2]); + Assert.assertTrue("Write Lock Held Samples should be positive", + writeHeldSamples > 0); + } + + private static class ParameterBuilder { + + private String volumeName = "vol1"; + private String bucketName = "bucket1"; + private int keyCountForRead = 100; + private int keyCountForWrite = 10; + private long keySizeInBytes = 256; + private int bufferSize = 64; + private int length = 10; + private int totalThreadCount = 100; + private int readThreadPercentage = 90; + private int numOfReadOperations = 50; + private int numOfWriteOperations = 10; + + private ParameterBuilder setVolumeName(String volumeNameParam) { + volumeName = volumeNameParam; + return this; + } + + private ParameterBuilder setBucketName(String bucketNameParam) { + bucketName = bucketNameParam; + return this; + } + + private ParameterBuilder setKeyCountForRead(int keyCountForReadParam) { + keyCountForRead = keyCountForReadParam; + return this; + } + + private ParameterBuilder setKeyCountForWrite(int keyCountForWriteParam) { + keyCountForWrite = keyCountForWriteParam; + return this; + } + + private ParameterBuilder setKeySizeInBytes(long keySizeInBytesParam) { + keySizeInBytes = keySizeInBytesParam; + return this; + } + + private ParameterBuilder setBufferSize(int bufferSizeParam) { + bufferSize = bufferSizeParam; + return this; + } + + private ParameterBuilder setLength(int lengthParam) { + length = lengthParam; + return this; + } + + private ParameterBuilder setTotalThreadCount(int totalThreadCountParam) { + totalThreadCount = totalThreadCountParam; + return this; + } + + private ParameterBuilder setReadThreadPercentage( + int readThreadPercentageParam) { + readThreadPercentage = readThreadPercentageParam; + return this; + } + + private ParameterBuilder setNumOfReadOperations( + int numOfReadOperationsParam) { + numOfReadOperations = numOfReadOperationsParam; + return this; + } + + private ParameterBuilder setNumOfWriteOperations( + int numOfWriteOperationsParam) { + numOfWriteOperations = numOfWriteOperationsParam; + return this; + } + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/AbstractOmBucketReadWriteOps.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/AbstractOmBucketReadWriteOps.java new file mode 100644 index 000000000000..c5b1cb229d9f --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/AbstractOmBucketReadWriteOps.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine.Option; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; + +/** + * Abstract class for OmBucketReadWriteFileOps/KeyOps Freon class + * implementations. + */ +public abstract class AbstractOmBucketReadWriteOps extends BaseFreonGenerator + implements Callable { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractOmBucketReadWriteOps.class); + + @Option(names = {"-g", "--size"}, + description = "Generated data size (in bytes) of each key/file to be " + + "written.", + defaultValue = "256") + private long sizeInBytes; + + @Option(names = {"--buffer"}, + description = "Size of buffer used for generating the key/file content.", + defaultValue = "64") + private int bufferSize; + + @Option(names = {"-l", "--name-len"}, + description = "Length of the random name of path you want to create.", + defaultValue = "10") + private int length; + + @Option(names = {"-c", "--total-thread-count"}, + description = "Total number of threads to be executed.", + defaultValue = "100") + private int totalThreadCount; + + @Option(names = {"-T", "--read-thread-percentage"}, + description = "Percentage of the total number of threads to be " + + "allocated for read operations. The remaining percentage of " + + "threads will be allocated for write operations.", + defaultValue = "90") + private int readThreadPercentage; + + @Option(names = {"-R", "--num-of-read-operations"}, + description = "Number of read operations to be performed by each thread.", + defaultValue = "50") + private int numOfReadOperations; + + @Option(names = {"-W", "--num-of-write-operations"}, + description = "Number of write operations to be performed by each " + + "thread.", + defaultValue = "10") + private int numOfWriteOperations; + + private OzoneConfiguration ozoneConfiguration; + private Timer timer; + private ContentGenerator contentGenerator; + private int readThreadCount; + private int writeThreadCount; + + protected abstract void display(); + + protected abstract void initialize(OzoneConfiguration configuration) + throws Exception; + + @Override + public Void call() throws Exception { + init(); + + readThreadCount = (readThreadPercentage * totalThreadCount) / 100; + writeThreadCount = totalThreadCount - readThreadCount; + + display(); + print("SizeInBytes: " + sizeInBytes); + print("bufferSize: " + bufferSize); + print("totalThreadCount: " + totalThreadCount); + print("readThreadPercentage: " + readThreadPercentage); + print("writeThreadPercentage: " + (100 - readThreadPercentage)); + print("readThreadCount: " + readThreadCount); + print("writeThreadCount: " + writeThreadCount); + print("numOfReadOperations: " + numOfReadOperations); + print("numOfWriteOperations: " + numOfWriteOperations); + + ozoneConfiguration = createOzoneConfiguration(); + contentGenerator = new ContentGenerator(sizeInBytes, bufferSize); + timer = getMetrics().timer("om-bucket-read-write-ops"); + + initialize(ozoneConfiguration); + + return null; + } + + protected abstract String createPath(String path) throws IOException; + + protected int readOperations(int keyCountForRead) throws Exception { + + // Create keyCountForRead/fileCountForRead (defaultValue = 1000) keys/files + // under rootPath/readPath + String readPath = createPath("readPath"); + create(readPath, keyCountForRead); + + // Start readThreadCount (defaultValue = 90) concurrent read threads + // performing numOfReadOperations (defaultValue = 50) iterations + // of read operations (bucket.listKeys(readPath) or + // fileSystem.listStatus(rootPath/readPath)) + ExecutorService readService = Executors.newFixedThreadPool(readThreadCount); + CompletionService readExecutorCompletionService = + new ExecutorCompletionService<>(readService); + List> readFutures = new ArrayList<>(); + for (int i = 0; i < readThreadCount; i++) { + readFutures.add(readExecutorCompletionService.submit(() -> { + int readCount = 0; + try { + for (int j = 0; j < numOfReadOperations; j++) { + readCount = getReadCount(readCount, "readPath"); + } + } catch (IOException e) { + LOG.warn("Exception while listing keys/files ", e); + } + return readCount; + })); + } + + int readResult = 0; + for (int i = 0; i < readFutures.size(); i++) { + readResult += readExecutorCompletionService.take().get(); + } + readService.shutdown(); + + return readResult; + } + + protected abstract int getReadCount(int readCount, String readPath) + throws IOException; + + protected int writeOperations(int keyCountForWrite) throws Exception { + + // Start writeThreadCount (defaultValue = 10) concurrent write threads + // performing numOfWriteOperations (defaultValue = 10) iterations + // of write operations (createKeys(writePath) or + // createFiles(rootPath/writePath)) + String writePath = createPath("writePath"); + + ExecutorService writeService = + Executors.newFixedThreadPool(writeThreadCount); + CompletionService writeExecutorCompletionService = + new ExecutorCompletionService<>(writeService); + List> writeFutures = new ArrayList<>(); + for (int i = 0; i < writeThreadCount; i++) { + writeFutures.add(writeExecutorCompletionService.submit(() -> { + int writeCount = 0; + try { + for (int j = 0; j < numOfWriteOperations; j++) { + create(writePath, keyCountForWrite); + writeCount++; + } + } catch (IOException e) { + LOG.warn("Exception while creating keys/files ", e); + } + return writeCount; + })); + } + + int writeResult = 0; + for (int i = 0; i < writeFutures.size(); i++) { + writeResult += writeExecutorCompletionService.take().get(); + } + writeService.shutdown(); + + return writeResult; + } + + protected void create(String path, int keyCount) + throws Exception { + for (int i = 0; i < keyCount; i++) { + String keyName = path.concat(OzoneConsts.OM_KEY_PREFIX) + .concat(RandomStringUtils.randomAlphanumeric(length)); + if (LOG.isDebugEnabled()) { + LOG.debug("Key/FileName : {}", keyName); + } + timer.time(() -> { + try (OutputStream stream = create(keyName)) { + contentGenerator.write(stream); + stream.flush(); + } + return null; + }); + } + } + + protected abstract OutputStream create(String pathName) throws IOException; + + protected long getSizeInBytes() { + return sizeInBytes; + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index 914b70969eef..ab04d6d96807 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -64,7 +64,8 @@ ClosedContainerReplicator.class, StreamingGenerator.class, SCMThroughputBenchmark.class, - OmBucketReadWriteFileOps.class}, + OmBucketReadWriteFileOps.class, + OmBucketReadWriteKeyOps.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) public class Freon extends GenericCli { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java index d8837f2ce399..80207a8860df 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java @@ -16,31 +16,18 @@ */ package org.apache.hadoop.ozone.freon; -import com.codahale.metrics.Timer; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; /** * Synthetic read/write file operations workload generator tool. @@ -52,12 +39,9 @@ versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true, showDefaultValues = true) -@SuppressWarnings("java:S2245") // no need for secure random -public class OmBucketReadWriteFileOps extends BaseFreonGenerator - implements Callable { - private static final Logger LOG = - LoggerFactory.getLogger(OmBucketReadWriteFileOps.class); +@SuppressWarnings("java:S2245") // no need for secure random +public class OmBucketReadWriteFileOps extends AbstractOmBucketReadWriteOps { @Option(names = {"-P", "--root-path"}, description = "Root path", @@ -74,89 +58,26 @@ public class OmBucketReadWriteFileOps extends BaseFreonGenerator defaultValue = "10") private int fileCountForWrite; - @Option(names = {"-g", "--file-size"}, - description = "Generated data size (in bytes) of each file to be " + - "written in each directory.", - defaultValue = "256") - private long fileSizeInBytes; - - @Option(names = {"-b", "--buffer"}, - description = "Size of buffer used to generated the file content.", - defaultValue = "64") - private int bufferSize; - - @Option(names = {"-l", "--name-len"}, - description = "Length of the random name of directory you want to " + - "create.", - defaultValue = "10") - private int length; - - @Option(names = {"-c", "--total-thread-count"}, - description = "Total number of threads to be executed.", - defaultValue = "100") - private int totalThreadCount; - - @Option(names = {"-T", "--read-thread-percentage"}, - description = "Percentage of the total number of threads to be " + - "allocated for read operations. The remaining percentage of " + - "threads will be allocated for write operations.", - defaultValue = "90") - private int readThreadPercentage; - - @Option(names = {"-R", "--num-of-read-operations"}, - description = "Number of read operations to be performed by each thread.", - defaultValue = "50") - private int numOfReadOperations; - - @Option(names = {"-W", "--num-of-write-operations"}, - description = "Number of write operations to be performed by each " + - "thread.", - defaultValue = "10") - private int numOfWriteOperations; - - private Timer timer; - - private ContentGenerator contentGenerator; - private FileSystem fileSystem; - private int readThreadCount; - private int writeThreadCount; - @Override - public Void call() throws Exception { - init(); - - readThreadCount = (readThreadPercentage * totalThreadCount) / 100; - writeThreadCount = totalThreadCount - readThreadCount; - + protected void display() { print("rootPath: " + rootPath); print("fileCountForRead: " + fileCountForRead); print("fileCountForWrite: " + fileCountForWrite); - print("fileSizeInBytes: " + fileSizeInBytes); - print("bufferSize: " + bufferSize); - print("totalThreadCount: " + totalThreadCount); - print("readThreadPercentage: " + readThreadPercentage); - print("writeThreadPercentage: " + (100 - readThreadPercentage)); - print("readThreadCount: " + readThreadCount); - print("writeThreadCount: " + writeThreadCount); - print("numOfReadOperations: " + numOfReadOperations); - print("numOfWriteOperations: " + numOfWriteOperations); - - OzoneConfiguration configuration = createOzoneConfiguration(); - fileSystem = FileSystem.get(URI.create(rootPath), configuration); - - contentGenerator = new ContentGenerator(fileSizeInBytes, bufferSize); - timer = getMetrics().timer("file-create"); + } + @Override + protected void initialize(OzoneConfiguration ozoneConfiguration) + throws IOException { + fileSystem = FileSystem.get(URI.create(rootPath), ozoneConfiguration); runTests(this::mainMethod); - return null; } private void mainMethod(long counter) throws Exception { - int readResult = readOperations(); - int writeResult = writeOperations(); + int readResult = readOperations(fileCountForRead); + int writeResult = writeOperations(fileCountForWrite); print("Total Files Read: " + readResult); print("Total Files Written: " + writeResult * fileCountForWrite); @@ -164,115 +85,24 @@ private void mainMethod(long counter) throws Exception { // TODO: print read/write lock metrics (HDDS-6435, HDDS-6436). } - private int readOperations() throws Exception { - - // Create fileCountForRead (defaultValue = 1000) files under - // rootPath/readPath directory - String readPath = - rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("readPath"); - fileSystem.mkdirs(new Path(readPath)); - createFiles(readPath, fileCountForRead); - - // Start readThreadCount (defaultValue = 90) concurrent read threads - // performing numOfReadOperations (defaultValue = 50) iterations - // of read operations (fileSystem.listStatus(rootPath/readPath)) - ExecutorService readService = Executors.newFixedThreadPool(readThreadCount); - CompletionService readExecutorCompletionService = - new ExecutorCompletionService<>(readService); - List> readFutures = new ArrayList<>(); - for (int i = 0; i < readThreadCount; i++) { - readFutures.add(readExecutorCompletionService.submit(() -> { - int readCount = 0; - try { - for (int j = 0; j < numOfReadOperations; j++) { - FileStatus[] status = - fileSystem.listStatus(new Path(readPath)); - readCount += status.length; - } - } catch (IOException e) { - LOG.warn("Exception while listing status ", e); - } - return readCount; - })); - } - - int readResult = 0; - for (int i = 0; i < readFutures.size(); i++) { - try { - readResult += readExecutorCompletionService.take().get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - readService.shutdown(); - - return readResult; + @Override + protected String createPath(String path) throws IOException { + String fullPath = rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat(path); + fileSystem.mkdirs(new Path(fullPath)); + return fullPath; } - private int writeOperations() throws Exception { - - // Start writeThreadCount (defaultValue = 10) concurrent write threads - // performing numOfWriteOperations (defaultValue = 10) iterations - // of write operations (createFiles(rootPath/writePath)) - String writePath = - rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("writePath"); - fileSystem.mkdirs(new Path(writePath)); - - ExecutorService writeService = - Executors.newFixedThreadPool(writeThreadCount); - CompletionService writeExecutorCompletionService = - new ExecutorCompletionService<>(writeService); - List> writeFutures = new ArrayList<>(); - for (int i = 0; i < writeThreadCount; i++) { - writeFutures.add(writeExecutorCompletionService.submit(() -> { - int writeCount = 0; - try { - for (int j = 0; j < numOfWriteOperations; j++) { - createFiles(writePath, fileCountForWrite); - writeCount++; - } - } catch (IOException e) { - LOG.warn("Exception while creating file ", e); - } - return writeCount; - })); - } - - int writeResult = 0; - for (int i = 0; i < writeFutures.size(); i++) { - try { - writeResult += writeExecutorCompletionService.take().get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - writeService.shutdown(); - - return writeResult; + @Override + protected int getReadCount(int readCount, String readPath) + throws IOException { + FileStatus[] status = fileSystem.listStatus(new Path(createPath(readPath))); + readCount += status.length; + return readCount; } - private void createFile(String dir, long counter) throws Exception { - String fileName = dir.concat(OzoneConsts.OM_KEY_PREFIX) - .concat(RandomStringUtils.randomAlphanumeric(length)); + @Override + protected OutputStream create(String fileName) throws IOException { Path file = new Path(fileName); - if (LOG.isDebugEnabled()) { - LOG.debug("FilePath:{}", file); - } - timer.time(() -> { - try (FSDataOutputStream output = fileSystem.create(file)) { - contentGenerator.write(output); - } - return null; - }); - } - - private void createFiles(String dir, int fileCount) throws Exception { - for (int i = 0; i < fileCount; i++) { - createFile(dir, i); - } + return fileSystem.create(file); } } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteKeyOps.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteKeyOps.java new file mode 100644 index 000000000000..459cc2affb22 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteKeyOps.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneKey; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; + +/** + * Synthetic read/write key operations workload generator tool. + */ +@Command(name = "obrwk", + aliases = "om-bucket-read-write-key-ops", + description = "Creates keys, performs respective read/write " + + "operations to measure lock performance.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) + +@SuppressWarnings("java:S2245") // no need for secure random +public class OmBucketReadWriteKeyOps extends AbstractOmBucketReadWriteOps { + + @Option(names = {"-v", "--volume"}, + description = "Name of the volume which contains the test data. Will be" + + " created if missing.", + defaultValue = "vol1") + private String volumeName; + + @Option(names = {"-b", "--bucket"}, + description = "Name of the bucket which contains the test data. Will be" + + " created if missing.", + defaultValue = "bucket1") + private String bucketName; + + @Option(names = {"-k", "--key-count-for-read"}, + description = "Number of keys to be created for read operations.", + defaultValue = "100") + private int keyCountForRead; + + @Option(names = {"-w", "--key-count-for-write"}, + description = "Number of keys to be created for write operations.", + defaultValue = "10") + private int keyCountForWrite; + + @Option(names = {"-o", "--om-service-id"}, + description = "OM Service ID" + ) + private String omServiceID = null; + + @CommandLine.Mixin + private FreonReplicationOptions replication; + + private Map metadata; + + private ReplicationConfig replicationConfig; + + private OzoneBucket bucket; + + @Override + protected void display() { + print("volumeName: " + volumeName); + print("bucketName: " + bucketName); + print("keyCountForRead: " + keyCountForRead); + print("keyCountForWrite: " + keyCountForWrite); + print("omServiceID: " + omServiceID); + } + + @Override + protected void initialize(OzoneConfiguration ozoneConfiguration) + throws Exception { + + replicationConfig = replication.fromParamsOrConfig(ozoneConfiguration); + metadata = new HashMap<>(); + + try (OzoneClient rpcClient = createOzoneClient(omServiceID, + ozoneConfiguration)) { + ensureVolumeAndBucketExist(rpcClient, volumeName, bucketName); + bucket = rpcClient.getObjectStore().getVolume(volumeName) + .getBucket(bucketName); + + runTests(this::mainMethod); + } + } + + private void mainMethod(long counter) throws Exception { + + int readResult = readOperations(keyCountForRead); + int writeResult = writeOperations(keyCountForWrite); + + print("Total Keys Read: " + readResult); + print("Total Keys Written: " + writeResult * keyCountForWrite); + + // TODO: print read/write lock metrics (HDDS-6435, HDDS-6436). + } + + @Override + protected String createPath(String path) { + return "".concat(OzoneConsts.OM_KEY_PREFIX).concat(path); + } + + @Override + protected int getReadCount(int readCount, String readPath) + throws IOException { + Iterator ozoneKeyIterator = bucket.listKeys( + OzoneConsts.OM_KEY_PREFIX + readPath + OzoneConsts.OM_KEY_PREFIX); + while (ozoneKeyIterator.hasNext()) { + ozoneKeyIterator.next(); + ++readCount; + } + return readCount; + } + + @Override + protected OutputStream create(String keyName) throws IOException { + return bucket.createKey(keyName, getSizeInBytes(), replicationConfig, + metadata); + } +}