From c909e78f5daad7ba96ba7a30d2fb0cdb864060bf Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 7 Nov 2024 11:13:49 +0100 Subject: [PATCH 1/3] TEZ-3331: Add operation specific HDFS counters for Tez UI --- .../common/counters/FileSystemCounter.java | 44 +++++-- tez-runtime-internals/pom.xml | 17 +++ .../metrics/FileSystemStatisticUpdater.java | 56 ++------- .../runtime/metrics/TaskCounterUpdater.java | 52 ++++---- .../TestFileSystemStatisticUpdater.java | 115 ++++++++++++++++++ .../metrics/TestTaskCounterUpdater.java | 59 +++++++++ .../tez/runtime/task/TestTaskExecution2.java | 9 +- 7 files changed, 263 insertions(+), 89 deletions(-) create mode 100644 tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java create mode 100644 tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 73e358179b..6cf15b2a52 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -19,16 +19,42 @@ package org.apache.tez.common.counters; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; @Private public enum FileSystemCounter { - BYTES_READ, - BYTES_WRITTEN, - READ_OPS, - LARGE_READ_OPS, - WRITE_OPS, - HDFS_BYTES_READ, - HDFS_BYTES_WRITTEN, - FILE_BYTES_READ, - FILE_BYTES_WRITTEN + BYTES_READ("bytesRead"), + BYTES_WRITTEN("bytesWritten"), + READ_OPS("readOps"), + LARGE_READ_OPS("largeReadOps"), + WRITE_OPS("writeOps"), + HDFS_BYTES_READ("hdfsBytesRead"), + HDFS_BYTES_WRITTEN("hdfsBytesWritten"), + FILE_BYTES_READ("fileBytesRead"), + FILE_BYTES_WRITTEN("fileBytesWritten"), + + // Additional counters from HADOOP-13305 + OP_APPEND(CommonStatisticNames.OP_APPEND), + OP_CREATE(CommonStatisticNames.OP_CREATE), + OP_DELETE(CommonStatisticNames.OP_DELETE), + OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS), + OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES), + OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS), + OP_MKDIRS(CommonStatisticNames.OP_MKDIRS), + OP_OPEN(CommonStatisticNames.OP_OPEN), + OP_RENAME(CommonStatisticNames.OP_RENAME), + OP_SET_ACL(CommonStatisticNames.OP_SET_ACL), + OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER), + OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION), + OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"); + + private final String opName; + + FileSystemCounter(String opName) { + this.opName = opName; + } + + public String getOpName() { + return opName; + } } diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml index 503bf2b103..e235e933cd 100644 --- a/tez-runtime-internals/pom.xml +++ b/tez-runtime-internals/pom.xml @@ -49,6 +49,23 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-common + test + test-jar + + + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + org.apache.hadoop hadoop-yarn-api diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java index bb15ef159f..4347340d7c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -17,9 +17,7 @@ package org.apache.tez.runtime.metrics; -import java.util.List; - -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -30,50 +28,22 @@ */ public class FileSystemStatisticUpdater { - private List stats; - private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter, - writeOpsCounter; - private String scheme; + private StorageStatistics stats; private TezCounters counters; - FileSystemStatisticUpdater(TezCounters counters, List stats, String scheme) { - this.stats = stats; - this.scheme = scheme; + FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) { + this.stats = storageStatistics; this.counters = counters; } void updateCounters() { - if (readBytesCounter == null) { - readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ); - } - if (writeBytesCounter == null) { - writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN); - } - if (readOpsCounter == null) { - readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS); - } - if (largeReadOpsCounter == null) { - largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS); - } - if (writeOpsCounter == null) { - writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS); - } - long readBytes = 0; - long writeBytes = 0; - long readOps = 0; - long largeReadOps = 0; - long writeOps = 0; - for (FileSystem.Statistics stat : stats) { - readBytes = readBytes + stat.getBytesRead(); - writeBytes = writeBytes + stat.getBytesWritten(); - readOps = readOps + stat.getReadOps(); - largeReadOps = largeReadOps + stat.getLargeReadOps(); - writeOps = writeOps + stat.getWriteOps(); + // loop through FileSystemCounter enums as it is a smaller set + for (FileSystemCounter fsCounter : FileSystemCounter.values()) { + Long val = stats.getLong(fsCounter.getOpName()); + if (val != null && val != 0) { + TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter); + counter.setValue(val); + } } - readBytesCounter.setValue(readBytes); - writeBytesCounter.setValue(writeBytes); - readOpsCounter.setValue(readOps); - largeReadOpsCounter.setValue(largeReadOps); - writeOpsCounter.setValue(writeOps); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index 48676e225b..d0ab041b50 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -18,17 +18,18 @@ package org.apache.tez.runtime.metrics; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.Iterator; import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.tez.common.GcTimeUpdater; import org.apache.tez.common.counters.TaskCounter; @@ -48,11 +49,16 @@ public class TaskCounterUpdater { private final TezCounters tezCounters; private final Configuration conf; +// /** +// * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater +// */ +// private Map statisticUpdaters = +// new HashMap<>(); /** - * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater + * A Map where Key-> URIScheme and value->Map */ - private Map statisticUpdaters = - new HashMap(); + private Map> statisticUpdaters = + new HashMap<>(); protected final GcTimeUpdater gcUpdater; private ResourceCalculatorProcessTree pTree; private long initCpuCumulativeTime = 0; @@ -69,32 +75,16 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid) public void updateCounters() { - // FileSystemStatistics are reset each time a new task is seen by the - // container. - // This doesn't remove the fileSystem, and does not clear all statistics - - // so there is a potential of an unused FileSystem showing up for a - // Container, and strange values for READ_OPS etc. - Map> map = new - HashMap>(); - for(Statistics stat: FileSystem.getAllStatistics()) { - String uriScheme = stat.getScheme(); - if (map.containsKey(uriScheme)) { - List list = map.get(uriScheme); - list.add(stat); - } else { - List list = new ArrayList(); - list.add(stat); - map.put(uriScheme, list); - } - } - for (Map.Entry> entry: map.entrySet()) { - FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey()); - if(updater==null) {//new FileSystem has been found in the cache - updater = - new FileSystemStatisticUpdater(tezCounters, entry.getValue(), - entry.getKey()); - statisticUpdaters.put(entry.getKey(), updater); + GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics(); + Iterator iter = globalStorageStatistics.iterator(); + while (iter.hasNext()) { + StorageStatistics stats = iter.next(); + if (!statisticUpdaters.containsKey(stats.getScheme())) { + Map updaterSet = new TreeMap<>(); + statisticUpdaters.put(stats.getScheme(), updaterSet); } + FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme()) + .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats)); updater.updateCounters(); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java new file mode 100644 index 0000000000..040960907c --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.metrics; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestFileSystemStatisticUpdater { + + private static final Logger LOG = LoggerFactory.getLogger( + TestFileSystemStatisticUpdater.class); + + private static MiniDFSCluster dfsCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestFileSystemStatisticUpdater.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) + .build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + } + + @AfterClass + public static void tearDown() { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + @Test + public void basicTest() throws IOException { + TezCounters counters = new TezCounters(); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + + remoteFs.mkdirs(new Path("/tmp/foo/")); + FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt")); + out.writeBytes("xyz"); + out.close(); + + updater.updateCounters(); + + LOG.info("Counters: " + counters); + TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(), + FileSystemCounter.OP_MKDIRS); + TezCounter createCounter = counters.findCounter(remoteFs.getScheme(), + FileSystemCounter.OP_CREATE); + Assert.assertNotNull(mkdirCounter); + Assert.assertNotNull(createCounter); + Assert.assertEquals(1, mkdirCounter.getValue()); + Assert.assertEquals(1, createCounter.getValue()); + + FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt")); + out1.writeBytes("xyz"); + out1.close(); + + long oldCreateVal = createCounter.getValue(); + updater.updateCounters(); + + LOG.info("Counters: " + counters); + Assert.assertTrue("Counter not updated, old=" + oldCreateVal + + ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal); + + oldCreateVal = createCounter.getValue(); + // Ensure all numbers are reset + remoteFs.clearStatistics(); + updater.updateCounters(); + LOG.info("Counters: " + counters); + Assert.assertEquals(oldCreateVal, createCounter.getValue()); + + } + + + +} diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java new file mode 100644 index 0000000000..88b0941fe7 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestTaskCounterUpdater { + + private static final Logger LOG = LoggerFactory.getLogger( + TestTaskCounterUpdater.class); + private static Configuration conf = new Configuration(); + + @Test + public void basicTest() { + TezCounters counters = new TezCounters(); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + + updater.updateCounters(); + LOG.info("Counters: " + counters); + TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); + TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); + + Assert.assertNotNull(gcCounter); + Assert.assertNotNull(cpuCounter); + long oldVal = cpuCounter.getValue(); + Assert.assertTrue(cpuCounter.getValue() > 0); + + updater.updateCounters(); + LOG.info("Counters: " + counters); + Assert.assertTrue("Counter not updated, old=" + oldVal + + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal); + + } + + +} diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index aeaec53124..9c26a321ca 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.tez.common.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListeningExecutorService; @@ -181,7 +180,9 @@ public void testMultipleSuccessfulTasks() throws IOException, InterruptedExcepti assertFalse(TestProcessor.wasAborted()); umbilical.resetTrackedEvents(); TezCounters tezCounters = runtimeTask.getCounters(); - verifySysCounters(tezCounters, 5, 5); + // with TEZ-3331, fs counters are not set if the value is 0 (see FileSystemStatisticUpdater2), so there can be + // a mismatch in task counter count and fs counter count + verifySysCounters(tezCounters, 5, 0); taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY, false); @@ -693,10 +694,6 @@ public void testClusterStorageCapacityFatalError() throws IOException { private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) { - Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) || - (minTaskCounterCount <= 0 && minFsCounterCount <= 0), - "Both targetCounter counts should be postitive or negative. A mix is not expected"); - int numTaskCounters = 0; int numFsCounters = 0; for (CounterGroup counterGroup : tezCounters) { From d642287d06c0bb02c7955f11f360363ecdb416c4 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Sun, 24 Nov 2024 01:04:04 +0100 Subject: [PATCH 2/3] PR comments, further refactoring --- .../common/counters/FileSystemCounter.java | 37 +++++++++-- .../metrics/FileSystemStatisticUpdater.java | 4 +- .../runtime/metrics/TaskCounterUpdater.java | 10 +-- .../TestFileSystemStatisticUpdater.java | 66 +++++++++---------- .../metrics/TestTaskCounterUpdater.java | 24 +++---- 5 files changed, 81 insertions(+), 60 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 6cf15b2a52..9ec34d7c82 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -21,6 +21,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; +/** + * FileSystemCounter is an enum for defining which filesystem/storage statistics are exposed in Tez. + */ @Private public enum FileSystemCounter { BYTES_READ("bytesRead"), @@ -28,25 +31,49 @@ public enum FileSystemCounter { READ_OPS("readOps"), LARGE_READ_OPS("largeReadOps"), WRITE_OPS("writeOps"), - HDFS_BYTES_READ("hdfsBytesRead"), - HDFS_BYTES_WRITTEN("hdfsBytesWritten"), - FILE_BYTES_READ("fileBytesRead"), - FILE_BYTES_WRITTEN("fileBytesWritten"), + // Additional counters from HADOOP-13305 // Additional counters from HADOOP-13305 OP_APPEND(CommonStatisticNames.OP_APPEND), + OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE), OP_CREATE(CommonStatisticNames.OP_CREATE), + OP_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE), OP_DELETE(CommonStatisticNames.OP_DELETE), + OP_EXISTS(CommonStatisticNames.OP_EXISTS), + OP_GET_CONTENT_SUMMARY(CommonStatisticNames.OP_GET_CONTENT_SUMMARY), + OP_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN), + OP_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM), OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS), + OP_GET_STATUS(CommonStatisticNames.OP_GET_STATUS), + OP_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS), + OP_IS_FILE(CommonStatisticNames.OP_IS_FILE), + OP_IS_DIRECTORY(CommonStatisticNames.OP_IS_DIRECTORY), OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES), OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS), + OP_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS), OP_MKDIRS(CommonStatisticNames.OP_MKDIRS), + OP_MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES), OP_OPEN(CommonStatisticNames.OP_OPEN), + OP_REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL), + OP_REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES), + OP_REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL), OP_RENAME(CommonStatisticNames.OP_RENAME), OP_SET_ACL(CommonStatisticNames.OP_SET_ACL), OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER), OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION), - OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"); + OP_SET_TIMES(CommonStatisticNames.OP_SET_TIMES), + OP_TRUNCATE(CommonStatisticNames.OP_TRUNCATE), + + // counters below are not needed in production, as the scheme_countername expansion is taken care of by the + // FileSystemCounterGroup, the only reason they are here is that some analyzers still depend on them + @Deprecated + HDFS_BYTES_READ("hdfsBytesRead"), + @Deprecated + HDFS_BYTES_WRITTEN("hdfsBytesWritten"), + @Deprecated + FILE_BYTES_READ("fileBytesRead"), + @Deprecated + FILE_BYTES_WRITTEN("fileBytesWritten"); private final String opName; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java index 4347340d7c..ad48d0d624 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java @@ -28,8 +28,8 @@ */ public class FileSystemStatisticUpdater { - private StorageStatistics stats; - private TezCounters counters; + private final StorageStatistics stats; + private final TezCounters counters; FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) { this.stats = storageStatistics; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index d0ab041b50..75e3e4c4ea 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -79,11 +79,11 @@ public void updateCounters() { Iterator iter = globalStorageStatistics.iterator(); while (iter.hasNext()) { StorageStatistics stats = iter.next(); - if (!statisticUpdaters.containsKey(stats.getScheme())) { - Map updaterSet = new TreeMap<>(); - statisticUpdaters.put(stats.getScheme(), updaterSet); - } - FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme()) + // Fetch or initialize the updater set for the scheme + Map updaterSet = statisticUpdaters + .computeIfAbsent(stats.getScheme(), k -> new TreeMap<>()); + // Fetch or create the updater for the specific statistic + FileSystemStatisticUpdater updater = updaterSet .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats)); updater.updateCounters(); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java index 040960907c..60cd0e8901 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; @@ -42,7 +42,7 @@ public class TestFileSystemStatisticUpdater { private static MiniDFSCluster dfsCluster; - private static Configuration conf = new Configuration(); + private static final Configuration CONF = new Configuration(); private static FileSystem remoteFs; private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + @@ -51,9 +51,8 @@ public class TestFileSystemStatisticUpdater { @BeforeClass public static void setup() throws IOException { try { - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) - .build(); + CONF.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build(); remoteFs = dfsCluster.getFileSystem(); } catch (IOException io) { throw new RuntimeException("problem starting mini dfs cluster", io); @@ -71,45 +70,40 @@ public static void tearDown() { @Test public void basicTest() throws IOException { TezCounters counters = new TezCounters(); - TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, CONF, "pid"); - remoteFs.mkdirs(new Path("/tmp/foo/")); - FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt")); - out.writeBytes("xyz"); - out.close(); + DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz"); updater.updateCounters(); + LOG.info("Counters (after first update): {}", counters); + assertCounter(counters, FileSystemCounter.OP_MKDIRS, 0); // DFSTestUtil doesn't call separate mkdirs + assertCounter(counters, FileSystemCounter.OP_CREATE, 1); + assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 3); // "xyz" + assertCounter(counters, FileSystemCounter.WRITE_OPS, 1); + assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 1); // DFSTestUtil calls fs.exists + assertCounter(counters, FileSystemCounter.OP_CREATE, 1); - LOG.info("Counters: " + counters); - TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(), - FileSystemCounter.OP_MKDIRS); - TezCounter createCounter = counters.findCounter(remoteFs.getScheme(), - FileSystemCounter.OP_CREATE); - Assert.assertNotNull(mkdirCounter); - Assert.assertNotNull(createCounter); - Assert.assertEquals(1, mkdirCounter.getValue()); - Assert.assertEquals(1, createCounter.getValue()); - - FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt")); - out1.writeBytes("xyz"); - out1.close(); - - long oldCreateVal = createCounter.getValue(); - updater.updateCounters(); + DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc1.txt"), "xyz"); - LOG.info("Counters: " + counters); - Assert.assertTrue("Counter not updated, old=" + oldCreateVal - + ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal); + updater.updateCounters(); + LOG.info("Counters (after second update): {}", counters); + assertCounter(counters, FileSystemCounter.OP_CREATE, 2); + assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 6); // "xyz" has been written twice + assertCounter(counters, FileSystemCounter.WRITE_OPS, 2); + assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 2); // DFSTestUtil calls fs.exists again + assertCounter(counters, FileSystemCounter.OP_CREATE, 2); - oldCreateVal = createCounter.getValue(); // Ensure all numbers are reset - remoteFs.clearStatistics(); + FileSystem.clearStatistics(); updater.updateCounters(); - LOG.info("Counters: " + counters); - Assert.assertEquals(oldCreateVal, createCounter.getValue()); - + LOG.info("Counters (after third update): {}", counters); + // counter holds its value after clearStatistics + updateCounters + assertCounter(counters, FileSystemCounter.OP_CREATE, 2); } - - + private void assertCounter(TezCounters counters, FileSystemCounter fsCounter, int value) { + TezCounter counter = counters.findCounter(remoteFs.getScheme(), fsCounter); + Assert.assertNotNull(counter); + Assert.assertEquals(value, counter.getValue()); + } } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java index 88b0941fe7..aa782396cb 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java @@ -29,31 +29,31 @@ public class TestTaskCounterUpdater { - private static final Logger LOG = LoggerFactory.getLogger( - TestTaskCounterUpdater.class); - private static Configuration conf = new Configuration(); + private static final Logger LOG = LoggerFactory.getLogger(TestTaskCounterUpdater.class); + private static final Configuration CONF = new Configuration(); @Test public void basicTest() { TezCounters counters = new TezCounters(); - TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, CONF, "pid"); updater.updateCounters(); - LOG.info("Counters: " + counters); - TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); - TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); + LOG.info("Counters (after first update): {}", counters); + assertCounter(counters, TaskCounter.GC_TIME_MILLIS); + TezCounter cpuCounter = assertCounter(counters, TaskCounter.CPU_MILLISECONDS); - Assert.assertNotNull(gcCounter); - Assert.assertNotNull(cpuCounter); long oldVal = cpuCounter.getValue(); Assert.assertTrue(cpuCounter.getValue() > 0); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters (after second update): {}", counters); Assert.assertTrue("Counter not updated, old=" + oldVal + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal); - } - + private TezCounter assertCounter(TezCounters counters, TaskCounter taskCounter) { + TezCounter counter = counters.findCounter(taskCounter); + Assert.assertNotNull(counter); + return counter; + } } From 082e2b5b276fb6953b4ab652c79c5eb85d7a9d3c Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 12 Dec 2024 12:18:09 +0100 Subject: [PATCH 3/3] PR comments 2 --- .../common/counters/FileSystemCounter.java | 1 - .../runtime/metrics/TaskCounterUpdater.java | 13 +++------ .../TestFileSystemStatisticUpdater.java | 27 ++++++++++++------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 9ec34d7c82..fdb93f1194 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -32,7 +32,6 @@ public enum FileSystemCounter { LARGE_READ_OPS("largeReadOps"), WRITE_OPS("writeOps"), - // Additional counters from HADOOP-13305 // Additional counters from HADOOP-13305 OP_APPEND(CommonStatisticNames.OP_APPEND), OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE), diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index 75e3e4c4ea..49f8fca25f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.TreeMap; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.StorageStatistics; @@ -49,16 +48,10 @@ public class TaskCounterUpdater { private final TezCounters tezCounters; private final Configuration conf; -// /** -// * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater -// */ -// private Map statisticUpdaters = -// new HashMap<>(); /** * A Map where Key-> URIScheme and value->Map */ - private Map> statisticUpdaters = - new HashMap<>(); + private final Map> statisticUpdaters = new HashMap<>(); protected final GcTimeUpdater gcUpdater; private ResourceCalculatorProcessTree pTree; private long initCpuCumulativeTime = 0; @@ -73,7 +66,7 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid) recordInitialCpuStats(); } - + public void updateCounters() { GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics(); Iterator iter = globalStorageStatistics.iterator(); @@ -81,7 +74,7 @@ public void updateCounters() { StorageStatistics stats = iter.next(); // Fetch or initialize the updater set for the scheme Map updaterSet = statisticUpdaters - .computeIfAbsent(stats.getScheme(), k -> new TreeMap<>()); + .computeIfAbsent(stats.getScheme(), k -> new HashMap<>()); // Fetch or create the updater for the specific statistic FileSystemStatisticUpdater updater = updaterSet .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats)); diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java index 60cd0e8901..b07f811ded 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java @@ -30,6 +30,7 @@ import org.apache.tez.common.counters.TezCounters; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -49,14 +50,8 @@ public class TestFileSystemStatisticUpdater { TestFileSystemStatisticUpdater.class.getName() + "-tmpDir"; @BeforeClass - public static void setup() throws IOException { - try { - CONF.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - dfsCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build(); - remoteFs = dfsCluster.getFileSystem(); - } catch (IOException io) { - throw new RuntimeException("problem starting mini dfs cluster", io); - } + public static void beforeClass() throws Exception { + CONF.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); } @AfterClass @@ -67,6 +62,21 @@ public static void tearDown() { } } + @Before + public void setup() throws IOException { + FileSystem.clearStatistics(); + try { + // tear down the whole cluster before each test to completely get rid of file system statistics + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + dfsCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + } + @Test public void basicTest() throws IOException { TezCounters counters = new TezCounters(); @@ -94,7 +104,6 @@ public void basicTest() throws IOException { assertCounter(counters, FileSystemCounter.OP_CREATE, 2); // Ensure all numbers are reset - FileSystem.clearStatistics(); updater.updateCounters(); LOG.info("Counters (after third update): {}", counters); // counter holds its value after clearStatistics + updateCounters