From 96afc1514137c4bcd3ae21b966ab8b10313011df Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 12 Jan 2023 15:10:49 +0100 Subject: [PATCH] measured input stream --- .../tez/http/MeasuredDataInputStream.java | 41 +++++++++++++++++++ .../library/common/shuffle/Fetcher.java | 3 +- .../orderedgrouped/FetcherOrderedGrouped.java | 3 +- 3 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 tez-runtime-library/src/main/java/org/apache/tez/http/MeasuredDataInputStream.java diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/MeasuredDataInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/http/MeasuredDataInputStream.java new file mode 100644 index 0000000000..3959f13746 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/MeasuredDataInputStream.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.http; + +import java.io.DataInputStream; +import java.io.IOException; + +public class MeasuredDataInputStream extends DataInputStream { + private long elapsedTimeMs = 0; + + public MeasuredDataInputStream(DataInputStream in) { + super(in); + } + + @Override + public int read() throws IOException { + long start = System.currentTimeMillis(); + int ret = super.read(); + elapsedTimeMs += System.currentTimeMillis() - start; + return ret; + } + + public long getElapsedTimeMs() { + return elapsedTimeMs; + } +} diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index eb34ec2993..1f46e057db 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -44,6 +44,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.http.MeasuredDataInputStream; import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -563,7 +564,7 @@ private HostFetchResult setupConnection(Collection attem protected void setupConnectionInternal(String host, Collection attempts) throws IOException, InterruptedException { - input = httpConnection.getInputStream(); + input = new MeasuredDataInputStream(httpConnection.getInputStream()); httpConnection.validate(); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 5887dcb3ca..3233994094 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.http.MeasuredDataInputStream; import org.apache.tez.common.CallableWithNdc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -388,7 +389,7 @@ boolean setupConnection(MapHost host, Collection attempt protected void setupConnectionInternal(MapHost host, Collection attempts) throws IOException, InterruptedException { - input = httpConnection.getInputStream(); + input = new MeasuredDataInputStream(httpConnection.getInputStream()); httpConnection.validate(); }