From b98caa3fe3902eacccc5e6d21c9558b189cdced7 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 31 Jan 2024 17:44:10 -0800 Subject: [PATCH 01/10] Add OpenTracing traces to read path. --- .../scm/storage/ContainerProtocolCalls.java | 74 ++++++++++++++++--- .../hadoop/ozone/client/rpc/RpcClient.java | 2 + 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 03b7844cc941..cfeea64b8431 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -29,6 +29,9 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -64,6 +67,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.security.token.Token; @@ -128,6 +132,10 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -143,7 +151,8 @@ static T tryEachDatanode(Pipeline pipeline, for (; ;) { final DatanodeDetails d = pipeline.getClosestNode(excluded); - try { + try (AutoCloseable scope = TracingUtil + .createActivatedSpan(d.toString())){ return op.apply(d); } catch (IOException e) { if (e instanceof StorageContainerException) { @@ -161,6 +170,8 @@ static T tryEachDatanode(Pipeline pipeline, } else { throw e; } + } catch (Exception e) { + LOG.warn("Unable to close trace span"); } } } @@ -211,10 +222,14 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List validators, ContainerCommandRequestProto.Builder builder, DatanodeDetails datanode) throws IOException { + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } final ContainerCommandRequestProto request = builder .setDatanodeUuid(datanode.getUuidString()).build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, validators); + xceiverClient.sendCommand(builder.build(), validators); return response.getGetBlock(); } @@ -246,6 +261,10 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request, getValidatorList()); @@ -319,10 +338,14 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( builder.setEncodedToken(token.encodeToUrlString()); } - return tryEachDatanode(xceiverClient.getPipeline(), - d -> readChunk(xceiverClient, chunk, blockID, - validators, builder, d), - d -> toErrorMessage(chunk, blockID, d)); + Span span = GlobalTracer.get() + .buildSpan("readChunk").start(); + try (Scope ignored = GlobalTracer.get().activateSpan(span)) { + return tryEachDatanode(xceiverClient.getPipeline(), + d -> readChunk(xceiverClient, chunk, blockID, + validators, builder, d), + d -> toErrorMessage(chunk, blockID, d)); + } } private static ContainerProtos.ReadChunkResponseProto readChunk( @@ -330,10 +353,14 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( List validators, ContainerCommandRequestProto.Builder builder, DatanodeDetails d) throws IOException { - final ContainerCommandRequestProto request = builder - .setDatanodeUuid(d.getUuidString()).build(); + ContainerCommandRequestProto.Builder requestBuilder = builder + .setDatanodeUuid(d.getUuidString()); + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + requestBuilder = requestBuilder.setTraceID(traceId); + } ContainerCommandResponseProto reply = - xceiverClient.sendCommand(request, validators); + xceiverClient.sendCommand(requestBuilder.build(), validators); final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { @@ -515,6 +542,11 @@ public static void createContainer(XceiverClientSpi client, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } + request.setCmdType(ContainerProtos.Type.CreateContainer); request.setContainerID(containerID); request.setCreateContainer(createRequest.build()); @@ -544,6 +576,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } client.sendCommand(request.build(), getValidatorList()); } @@ -566,6 +602,10 @@ public static void closeContainer(XceiverClientSpi client, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } client.sendCommand(request.build(), getValidatorList()); } @@ -589,6 +629,10 @@ public static ReadContainerResponseProto readContainer( if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } ContainerCommandResponseProto response = client.sendCommand(request.build(), getValidatorList()); @@ -624,6 +668,10 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = client.sendCommand(request, getValidatorList()); @@ -694,6 +742,10 @@ public static List toValidatorList(Validator validator) { if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); Map responses = xceiverClient.sendCommandOnAllNodes(request); @@ -719,6 +771,10 @@ public static List toValidatorList(Validator validator) { if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } Map responses = client.sendCommandOnAllNodes(request.build()); for (Map.Entry entry : diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 178a9919c114..f84cc55dc173 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -322,6 +322,8 @@ public void onRemoval( this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire(); + + TracingUtil.initTracing("client", conf); } public XceiverClientFactory getXceiverClientManager() { From 82985136d8f19638de07b58aef6d07ea0048482f Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 31 Jan 2024 21:33:17 -0800 Subject: [PATCH 02/10] HDDS-10268. [hsync] Add OpenTracing traces to client side read path. --- .../hdds/scm/ContainerClientMetrics.java | 79 ++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java index d51dfa416313..422943fff042 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.ozone.OzoneConsts; import java.util.Map; @@ -51,6 +52,11 @@ public final class ContainerClientMetrics { private MutableCounterLong totalWriteChunkCalls; @Metric private MutableCounterLong totalWriteChunkBytes; + private MutableQuantiles[] listBlockLatency; + private MutableQuantiles[] getBlockLatency; + private MutableQuantiles[] getCommittedBlockLengthLatency; + private MutableQuantiles[] readChunkLatency; + private MutableQuantiles[] getSmallFileLatency; private final Map writeChunkCallsByPipeline; private final Map writeChunkBytesByPipeline; private final Map writeChunksCallsByLeaders; @@ -84,6 +90,36 @@ private ContainerClientMetrics() { writeChunkCallsByPipeline = new ConcurrentHashMap<>(); writeChunkBytesByPipeline = new ConcurrentHashMap<>(); writeChunksCallsByLeaders = new ConcurrentHashMap<>(); + + listBlockLatency = new MutableQuantiles[3]; + getBlockLatency = new MutableQuantiles[3]; + getCommittedBlockLengthLatency = new MutableQuantiles[3]; + readChunkLatency = new MutableQuantiles[3]; + getSmallFileLatency = new MutableQuantiles[3]; + int[] intervals = {60, 300, 900}; + for (int i = 0; i < intervals.length; i++) { + int interval = intervals[i]; + listBlockLatency[i] = registry + .newQuantiles("listBlockLatency" + interval + + "s", "ListBlock latency in microseconds", "ops", + "latency", interval); + getBlockLatency[i] = registry + .newQuantiles("getBlockLatency" + interval + + "s", "GetBlock latency in microseconds", "ops", + "latency", interval); + getCommittedBlockLengthLatency[i] = registry + .newQuantiles("getCommittedBlockLengthLatency" + interval + + "s", "GetCommittedBlockLength latency in microseconds", + "ops", "latency", interval); + readChunkLatency[i] = registry + .newQuantiles("readChunkLatency" + interval + + "s", "ReadChunk latency in microseconds", "ops", + "latency", interval); + getSmallFileLatency[i] = registry + .newQuantiles("getSmallFileLatency" + interval + + "s", "GetSmallFile latency in microseconds", "ops", + "latency", interval); + } } public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) { @@ -111,7 +147,48 @@ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) { totalWriteChunkBytes.incr(chunkSizeBytes); } - MutableCounterLong getTotalWriteChunkBytes() { + public void addListBlockLatency(long latency) { + for (MutableQuantiles q : listBlockLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetBlockLatency(long latency) { + for (MutableQuantiles q : getBlockLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetCommittedBlockLengthLatency(long latency) { + for (MutableQuantiles q : getCommittedBlockLengthLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addReadChunkLatency(long latency) { + for (MutableQuantiles q : readChunkLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetSmallFileLatency(long latency) { + for (MutableQuantiles q : getSmallFileLatency) { + if (q != null) { + q.add(latency); + } + } + } + + @VisibleForTesting + public MutableCounterLong getTotalWriteChunkBytes() { return totalWriteChunkBytes; } From f02dd579c379af4ba6d615f8ffcf6f00a940ee4a Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 23 Feb 2024 10:33:53 -0800 Subject: [PATCH 03/10] Add traces to OFS file system APIs and input stream APIs. (cherry picked from commit 92df2346bd9e3280bffb9cc3f4a9c71db1e8b229) --- .../scm/storage/ContainerProtocolCalls.java | 16 +++-- .../hadoop/hdds/tracing/TracingUtil.java | 10 +++ .../fs/ozone/BasicRootedOzoneFileSystem.java | 71 ++++++++++++++----- .../hadoop/fs/ozone/OzoneFSInputStream.java | 42 ++++++++--- .../fs/ozone/RootedOzoneFileSystem.java | 12 ++++ 5 files changed, 120 insertions(+), 31 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index cfeea64b8431..2c4c9ef41939 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -151,18 +151,20 @@ static T tryEachDatanode(Pipeline pipeline, for (; ;) { final DatanodeDetails d = pipeline.getClosestNode(excluded); - try (AutoCloseable scope = TracingUtil - .createActivatedSpan(d.toString())){ + try { return op.apply(d); } catch (IOException e) { + Span span = GlobalTracer.get().activeSpan(); if (e instanceof StorageContainerException) { StorageContainerException sce = (StorageContainerException)e; // Block token expired. There's no point retrying other DN. // Throw the exception to request a new block token right away. if (sce.getResult() == BLOCK_TOKEN_VERIFICATION_FAILED) { + span.log("block token verification failed at DN " + d); throw e; } } + span.log("failed to connect to DN " + d); excluded.add(d); if (excluded.size() < pipeline.size()) { LOG.warn(toErrorMessage.apply(d) @@ -170,8 +172,6 @@ static T tryEachDatanode(Pipeline pipeline, } else { throw e; } - } catch (Exception e) { - LOG.warn("Unable to close trace span"); } } } @@ -341,10 +341,15 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( Span span = GlobalTracer.get() .buildSpan("readChunk").start(); try (Scope ignored = GlobalTracer.get().activateSpan(span)) { + span.setTag("offset", chunk.getOffset()) + .setTag("length", chunk.getLen()) + .setTag("block", blockID.toString()); return tryEachDatanode(xceiverClient.getPipeline(), d -> readChunk(xceiverClient, chunk, blockID, validators, builder, d), d -> toErrorMessage(chunk, blockID, d)); + } finally { + span.finish(); } } @@ -355,7 +360,8 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( DatanodeDetails d) throws IOException { ContainerCommandRequestProto.Builder requestBuilder = builder .setDatanodeUuid(d.getUuidString()); - String traceId = TracingUtil.exportCurrentSpan(); + Span span = GlobalTracer.get().activeSpan(); + String traceId = TracingUtil.exportSpan(span); if (traceId != null) { requestBuilder = requestBuilder.setTraceID(traceId); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index b968d407232c..943c808f7122 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -165,6 +165,16 @@ private static R executeInSpan(Span span, } } + /** + * Execute {@code supplier} inside an activated new span. + */ + public static void executeInNewSpan(String spanName, + CheckedRunnable runnable) throws E { + Span span = GlobalTracer.get() + .buildSpan(spanName).start(); + executeInSpan(span, runnable); + } + /** * Execute {@code runnable} in the given {@code span}. */ diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java index 1fcb1554b6c3..4dcdc1297553 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.ozone; import com.google.common.base.Preconditions; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CreateFlag; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.ozone.OFSPath; @@ -239,7 +242,12 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { statistics.incrementReadOps(1); LOG.trace("open() path: {}", path); final String key = pathToKey(path); - return new FSDataInputStream(createFSInputStream(adapter.readFile(key))); + return TracingUtil.executeInNewSpan("ofs open", + () -> { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("path", key); + return new FSDataInputStream(createFSInputStream(adapter.readFile(key))); + }); } protected InputStream createFSInputStream(InputStream inputStream) { @@ -256,14 +264,15 @@ protected void incrementCounter(Statistic statistic, long count) { @Override public FSDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, - short replication, long blockSize, - Progressable progress) throws IOException { + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) throws IOException { LOG.trace("create() path:{}", f); incrementCounter(Statistic.INVOCATION_CREATE, 1); statistics.incrementWriteOps(1); final String key = pathToKey(f); - return createOutputStream(key, replication, overwrite, true); + return TracingUtil.executeInNewSpan("ofs create", + () -> createOutputStream(key, replication, overwrite, true)); } @Override @@ -277,8 +286,10 @@ public FSDataOutputStream createNonRecursive(Path path, incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1); statistics.incrementWriteOps(1); final String key = pathToKey(path); - return createOutputStream(key, - replication, flags.contains(CreateFlag.OVERWRITE), false); + return TracingUtil.executeInNewSpan("ofs createNonRecursive", + () -> + createOutputStream(key, + replication, flags.contains(CreateFlag.OVERWRITE), false)); } private OutputStream selectOutputStream(String key, short replication, @@ -374,6 +385,14 @@ boolean processKeyPath(List keyPathList) throws IOException { */ @Override public boolean rename(Path src, Path dst) throws IOException { + return TracingUtil.executeInNewSpan("ofs rename", + () -> renameInSpan(src, dst)); + } + + private boolean renameInSpan(Path src, Path dst) throws IOException { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("src", src.toString()) + .setTag("dst", dst.toString()); incrementCounter(Statistic.INVOCATION_RENAME, 1); statistics.incrementWriteOps(1); if (src.equals(dst)) { @@ -526,8 +545,8 @@ protected void rename(final Path src, final Path dst, @Override public Path createSnapshot(Path path, String snapshotName) throws IOException { - String snapshot = getAdapter() - .createSnapshot(pathToKey(path), snapshotName); + String snapshot = TracingUtil.executeInNewSpan("ofs createSnapshot", + () -> getAdapter().createSnapshot(pathToKey(path), snapshotName)); return new Path(OzoneFSUtils.trimPathToDepth(path, PATH_DEPTH_TO_BUCKET), OM_SNAPSHOT_INDICATOR + OZONE_URI_DELIMITER + snapshot); } @@ -541,7 +560,8 @@ public void renameSnapshot(Path path, String snapshotOldName, String snapshotNew @Override public void deleteSnapshot(Path path, String snapshotName) throws IOException { - adapter.deleteSnapshot(pathToKey(path), snapshotName); + TracingUtil.executeInNewSpan("ofs deleteSnapshot", + () -> adapter.deleteSnapshot(pathToKey(path), snapshotName)); } private class DeleteIterator extends OzoneListingIterator { @@ -672,6 +692,11 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException { */ @Override public boolean delete(Path f, boolean recursive) throws IOException { + return TracingUtil.executeInNewSpan("ofs delete", + () -> deleteInSpan(f, recursive)); + } + + private boolean deleteInSpan(Path f, boolean recursive) throws IOException { incrementCounter(Statistic.INVOCATION_DELETE, 1); statistics.incrementWriteOps(1); LOG.debug("Delete path {} - recursive {}", f, recursive); @@ -889,7 +914,8 @@ private boolean o3Exists(final Path f) throws IOException { @Override public FileStatus[] listStatus(Path f) throws IOException { - return convertFileStatusArr(listStatusAdapter(f)); + return TracingUtil.executeInNewSpan("ofs listStatus", + () -> convertFileStatusArr(listStatusAdapter(f))); } private FileStatus[] convertFileStatusArr( @@ -946,7 +972,8 @@ public Path getWorkingDirectory() { @Override public Token getDelegationToken(String renewer) throws IOException { - return adapter.getDelegationToken(renewer); + return TracingUtil.executeInNewSpan("ofs getDelegationToken", + () -> adapter.getDelegationToken(renewer)); } /** @@ -1014,7 +1041,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { if (isEmpty(key)) { return false; } - return mkdir(f); + return TracingUtil.executeInNewSpan("ofs mkdirs", + () -> mkdir(f)); } @Override @@ -1025,7 +1053,8 @@ public long getDefaultBlockSize() { @Override public FileStatus getFileStatus(Path f) throws IOException { - return convertFileStatus(getFileStatusAdapter(f)); + return TracingUtil.executeInNewSpan("ofs getFileStatus", + () -> convertFileStatus(getFileStatusAdapter(f))); } public FileStatusAdapter getFileStatusAdapter(Path f) throws IOException { @@ -1096,7 +1125,8 @@ public boolean exists(Path f) throws IOException { public FileChecksum getFileChecksum(Path f, long length) throws IOException { incrementCounter(Statistic.INVOCATION_GET_FILE_CHECKSUM); String key = pathToKey(f); - return adapter.getFileChecksum(key, length); + return TracingUtil.executeInNewSpan("ofs getFileChecksum", + () -> adapter.getFileChecksum(key, length)); } @Override @@ -1508,6 +1538,11 @@ FileStatus convertFileStatus(FileStatusAdapter fileStatusAdapter) { @Override public ContentSummary getContentSummary(Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs getContentSummary", + () -> getContentSummaryInSpan(f)); + } + + private ContentSummary getContentSummaryInSpan(Path f) throws IOException { FileStatusAdapter status = getFileStatusAdapter(f); if (status.isFile()) { @@ -1583,7 +1618,8 @@ public void setTimes(Path f, long mtime, long atime) throws IOException { if (key.equals("NONE")) { throw new FileNotFoundException("File not found. path /NONE."); } - adapter.setTimes(key, mtime, atime); + TracingUtil.executeInNewSpan("ofs setTimes", + () -> adapter.setTimes(key, mtime, atime)); } protected boolean setSafeModeUtil(SafeModeAction action, @@ -1595,6 +1631,7 @@ protected boolean setSafeModeUtil(SafeModeAction action, statistics.incrementWriteOps(1); } LOG.trace("setSafeMode() action:{}", action); - return getAdapter().setSafeMode(action, isChecked); + return TracingUtil.executeInNewSpan("ofs setSafeMode", + () -> getAdapter().setSafeMode(action, isChecked)); } } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java index 918640799c71..35ee20d56c34 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -23,6 +23,9 @@ import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; @@ -30,6 +33,7 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.tracing.TracingUtil; /** * The input stream for Ozone file system. @@ -52,25 +56,40 @@ public OzoneFSInputStream(InputStream inputStream, Statistics statistics) { @Override public int read() throws IOException { - int byteRead = inputStream.read(); - if (statistics != null && byteRead >= 0) { - statistics.incrementBytesRead(1); + Span span = GlobalTracer.get() + .buildSpan("OzoneFSInputStream.read").start(); + try (Scope scope = GlobalTracer.get().activateSpan(span)) { + int byteRead = inputStream.read(); + if (statistics != null && byteRead >= 0) { + statistics.incrementBytesRead(1); + } + return byteRead; + } finally { + span.finish(); } - return byteRead; } @Override public int read(byte[] b, int off, int len) throws IOException { - int bytesRead = inputStream.read(b, off, len); - if (statistics != null && bytesRead >= 0) { - statistics.incrementBytesRead(bytesRead); + Span span = GlobalTracer.get() + .buildSpan("OzoneFSInputStream.read").start(); + try (Scope scope = GlobalTracer.get().activateSpan(span)) { + span.setTag("offset", off) + .setTag("length", len); + int bytesRead = inputStream.read(b, off, len); + if (statistics != null && bytesRead >= 0) { + statistics.incrementBytesRead(bytesRead); + } + return bytesRead; + } finally { + span.finish(); } - return bytesRead; } @Override public synchronized void close() throws IOException { - inputStream.close(); + TracingUtil.executeInNewSpan("OzoneFSInputStream.close", + inputStream::close); } @Override @@ -101,6 +120,11 @@ public int available() throws IOException { */ @Override public int read(ByteBuffer buf) throws IOException { + return TracingUtil.executeInNewSpan("OzoneFSInputStream.read(ByteBuffer)", + () -> readInTrace(buf)); + } + + private int readInTrace(ByteBuffer buf) throws IOException { if (buf.isReadOnly()) { throw new ReadOnlyBufferException(); } diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 7561e20a875d..19502d0df8c2 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.LeaseRecoverable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeMode; @@ -29,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; @@ -124,6 +126,11 @@ public boolean hasPathCapability(final Path path, final String capability) @Override public boolean recoverLease(final Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs recoverLease", + () -> recoverLeaseTraced(f)); + } + private boolean recoverLeaseTraced(final Path f) throws IOException { + GlobalTracer.get().activeSpan().setTag("path", f.toString()); statistics.incrementWriteOps(1); LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); @@ -133,6 +140,11 @@ public boolean recoverLease(final Path f) throws IOException { @Override public boolean isFileClosed(Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs isFileClosed", + () -> isFileClosedTraced(f)); + } + private boolean isFileClosedTraced(Path f) throws IOException { + GlobalTracer.get().activeSpan().setTag("path", f.toString()); statistics.incrementWriteOps(1); LOG.trace("isFileClosed() path:{}", f); Path qualifiedPath = makeQualified(f); From 5ed878caa7830c3e80b1db6d0e26164a49fb02c8 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 23 Feb 2024 14:55:12 -0800 Subject: [PATCH 04/10] Add trace span to OzoneFSOutputStream APIs. (cherry picked from commit 5fe0cc17465a6fed5ea1f21ed974659c17567035) --- .../hadoop/fs/ozone/OzoneFSOutputStream.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java index 141a40469419..c5f62d6f68ba 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java @@ -18,7 +18,10 @@ package org.apache.hadoop.fs.ozone; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import java.io.IOException; @@ -42,17 +45,24 @@ public OzoneFSOutputStream(OzoneOutputStream outputStream) { @Override public void write(int b) throws IOException { - outputStream.write(b); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.write", + () -> outputStream.write(b)); } @Override public void write(byte[] b, int off, int len) throws IOException { - outputStream.write(b, off, len); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.write", + () -> { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("length", len); + outputStream.write(b, off, len); + }); } @Override public synchronized void flush() throws IOException { - outputStream.flush(); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.flush", + outputStream::flush); } @Override @@ -67,7 +77,8 @@ public void hflush() throws IOException { @Override public void hsync() throws IOException { - outputStream.hsync(); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.hsync", + outputStream::hsync); } protected OzoneOutputStream getWrappedOutputStream() { From 0f897e99cd41e7096b6b64f3c15c74ee494bcd2d Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 23 Feb 2024 15:50:08 -0800 Subject: [PATCH 05/10] Fix compilation error. --- .../org/apache/hadoop/hdds/tracing/TracingUtil.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 943c808f7122..b968d407232c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -165,16 +165,6 @@ private static R executeInSpan(Span span, } } - /** - * Execute {@code supplier} inside an activated new span. - */ - public static void executeInNewSpan(String spanName, - CheckedRunnable runnable) throws E { - Span span = GlobalTracer.get() - .buildSpan(spanName).start(); - executeInSpan(span, runnable); - } - /** * Execute {@code runnable} in the given {@code span}. */ From c2c69f1b136319c0664b7a7e02225ba5cb25616a Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 15 Mar 2024 14:40:35 -0700 Subject: [PATCH 06/10] Fix compilation error Change-Id: I8f98bfd7cbe6857b017c16823a73eae6f2592c58 --- .../org/apache/hadoop/hdds/tracing/TracingUtil.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index b968d407232c..491b9fff4cca 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -139,6 +139,16 @@ public static boolean isTracingEnabled( ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT); } + /** + * Execute {@code supplier} inside an activated new span. + */ + public static void executeInNewSpan(String spanName, + CheckedRunnable runnable) throws E { + Span span = GlobalTracer.get() + .buildSpan(spanName).start(); + executeInSpan(span, runnable); + } + /** * Execute {@code supplier} inside an activated new span. */ From 2620774d7ec3131c4df8289e75569994ad8f8364 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 15 Mar 2024 14:42:46 -0700 Subject: [PATCH 07/10] Javadoc Change-Id: I9c23acda58dfdd07508e1f7742865dc8679156e4 --- .../main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 491b9fff4cca..29bd847319ea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -140,7 +140,7 @@ public static boolean isTracingEnabled( } /** - * Execute {@code supplier} inside an activated new span. + * Execute {@code runnable} inside an activated new span. */ public static void executeInNewSpan(String spanName, CheckedRunnable runnable) throws E { From 65b80d6f9adfa1b4b33061f970e547d45be5d649 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 15 Mar 2024 15:25:52 -0700 Subject: [PATCH 08/10] Fix findbugs. Change-Id: I06a8f5a0342ed53e08174090a4922f8785e06a0d --- .../apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 2c4c9ef41939..72754d1f1cf6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -229,7 +229,7 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, final ContainerCommandRequestProto request = builder .setDatanodeUuid(datanode.getUuidString()).build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(builder.build(), validators); + xceiverClient.sendCommand(request, validators); return response.getGetBlock(); } From 492d46733faeb3badb811839d2c2809924925a0d Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 9 Apr 2024 14:54:04 -0700 Subject: [PATCH 09/10] Update hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com> --- .../java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 19502d0df8c2..c377128d2940 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -143,6 +143,7 @@ public boolean isFileClosed(Path f) throws IOException { return TracingUtil.executeInNewSpan("ofs isFileClosed", () -> isFileClosedTraced(f)); } + private boolean isFileClosedTraced(Path f) throws IOException { GlobalTracer.get().activeSpan().setTag("path", f.toString()); statistics.incrementWriteOps(1); From 32a0f612fe231f252d4cf37502895652a0b71e63 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 9 Apr 2024 14:58:25 -0700 Subject: [PATCH 10/10] Remove extra leading spaces. Change-Id: I6888d0ba3fa8366f04e1d065a83b8f7af5c134ff --- .../apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java index 4dcdc1297553..3ba291ae0fd0 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java @@ -264,9 +264,9 @@ protected void incrementCounter(Statistic statistic, long count) { @Override public FSDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, - short replication, long blockSize, - Progressable progress) throws IOException { + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) throws IOException { LOG.trace("create() path:{}", f); incrementCounter(Statistic.INVOCATION_CREATE, 1); statistics.incrementWriteOps(1);