From 75e825d39ef50c08fe9134e0db6833919675745c Mon Sep 17 00:00:00 2001 From: Seonggon Namgung Date: Fri, 19 Jan 2024 10:59:08 +0900 Subject: [PATCH 1/3] Introduce IAI.include() to compare Composite IAI and single IAI properly --- .../CompositeInputAttemptIdentifier.java | 8 ++++- .../common/InputAttemptIdentifier.java | 6 +++- .../impl/ShuffleInputEventHandlerImpl.java | 1 + .../common/shuffle/impl/ShuffleManager.java | 35 ++++++++++++++++--- .../orderedgrouped/ShuffleScheduler.java | 14 +++++++- 5 files changed, 57 insertions(+), 7 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java index 30295bd399..7f5c94b41b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java @@ -50,6 +50,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); } + public boolean include(int thatInputIdentifier, int thatAttemptNumber) { + return + super.getInputIdentifier() <= thatInputIdentifier && thatInputIdentifier < (super.getInputIdentifier() + inputIdentifierCount) && + super.getAttemptNumber() == thatAttemptNumber; + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -63,6 +69,6 @@ public boolean equals(Object obj) { @Override public String toString() { - return super.toString(); + return super.toString() + ", count=" + inputIdentifierCount; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index 16172e1daf..ce82e75d8a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -108,6 +108,10 @@ public boolean canRetrieveInputInChunks() { (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()); } + public boolean include(int thatInputIdentifier, int thatAttemptNumber) { + return this.inputIdentifier == thatInputIdentifier && this.attemptNumber == thatAttemptNumber; + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -139,6 +143,6 @@ public boolean equals(Object obj) { public String toString() { return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier + ", attemptNumber=" + attemptNumber + ", pathComponent=" - + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]"; + + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId + "]"; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 4f42f57a1e..1394a57ad3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload, private void processInputFailedEvent(InputFailedEvent ife) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); + LOG.info("Marking obsolete input: " + inputContext.getSourceVertexName() + " " + srcAttemptIdentifier); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 769ac68f7e..4c051d3310 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -583,8 +583,15 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { } else { alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); } - // Avoid adding attempts which have already completed or have been marked as OBSOLETE - if (alreadyCompleted || obsoletedInputs.contains(input)) { + + // Avoid adding attempts which have already completed + if (alreadyCompleted) { + inputIter.remove(); + continue; + } + // Avoid adding attempts which have been marked as OBSOLETE + if (isObsoleteInputAttemptIdentifier(input)) { + LOG.info("Skipping obsolete input: " + input); inputIter.remove(); continue; } @@ -949,10 +956,14 @@ public void fetchFailed(String host, // TODO NEWTEZ. Implement logic to report fetch failures after a threshold. // For now, reporting immediately. InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier(); + if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) { + LOG.info("Do not report obsolete input: " + srcAttemptIdentifier); + return; + } LOG.info( - "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, " + "{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, " + "local fetch: {}, remote fetch failure reported as local failure: {})", - sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, + sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed, inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource()); failedShufflesCounter.increment(1); inputContext.notifyProgress(); @@ -984,6 +995,22 @@ public void fetchFailed(String host, } } } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + if (input == null) { + return false; + } + InputAttemptIdentifier obsoleteInput; + Iterator obsoleteInputsIter = obsoletedInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) { + return true; + } + } + return false; + } + /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index f68ab948ba..8b793b4788 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1175,7 +1175,19 @@ private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) { } else { isInputFinished = isInputFinished(id.getInputIdentifier()); } - return !obsoleteInputs.contains(id) && !isInputFinished; + return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished; + } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + InputAttemptIdentifier obsoleteInput; + Iterator obsoleteInputsIter = obsoleteInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) { + return true; + } + } + return false; } public synchronized List getMapsForHost(MapHost host) { From bf3a819f22b234221d55441c18e1de3f03fddd38 Mon Sep 17 00:00:00 2001 From: seonggon Date: Mon, 23 Dec 2024 17:21:13 +0900 Subject: [PATCH 2/3] Address review comments --- .../common/CompositeInputAttemptIdentifier.java | 11 +++++++---- .../library/common/InputAttemptIdentifier.java | 12 ++++++++++-- .../shuffle/impl/ShuffleInputEventHandlerImpl.java | 2 +- .../library/common/shuffle/impl/ShuffleManager.java | 12 +++--------- .../shuffle/orderedgrouped/ShuffleScheduler.java | 2 +- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java index 7f5c94b41b..e07e687664 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common; +import com.google.common.collect.Range; import org.apache.hadoop.classification.InterfaceAudience.Private; /** @@ -50,10 +51,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); } - public boolean include(int thatInputIdentifier, int thatAttemptNumber) { - return - super.getInputIdentifier() <= thatInputIdentifier && thatInputIdentifier < (super.getInputIdentifier() + inputIdentifierCount) && - super.getAttemptNumber() == thatAttemptNumber; + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + Range inputRange = + Range.closedOpen(super.getInputIdentifier(), super.getInputIdentifier() + inputIdentifierCount); + + return inputRange.contains(thatInputAttemptIdentifier.getInputIdentifier()) && + super.getAttemptNumber() == thatInputAttemptIdentifier.getAttemptNumber(); } // PathComponent & shared does not need to be part of the hashCode and equals computation. diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index ce82e75d8a..d1d5aeda1a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -108,8 +108,16 @@ public boolean canRetrieveInputInChunks() { (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()); } - public boolean include(int thatInputIdentifier, int thatAttemptNumber) { - return this.inputIdentifier == thatInputIdentifier && this.attemptNumber == thatAttemptNumber; + /** + * Checks whether this InputAttemptIdentifier includes the given InputAttemptIdentifier. + * It is used when we obsolete InputAttemptIdentifiers that include a FetchFailure reported one. + * + * @param thatInputAttemptIdentifier The InputAttemptIdentifier to check for inclusion. + * @return True if the current identifier includes the given one, false otherwise. + */ + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + return this.inputIdentifier == thatInputAttemptIdentifier.getInputIdentifier() && + this.attemptNumber == thatInputAttemptIdentifier.getAttemptNumber(); } // PathComponent & shared does not need to be part of the hashCode and equals computation. diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 1394a57ad3..56b8cd4a08 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -282,7 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload, private void processInputFailedEvent(InputFailedEvent ife) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); - LOG.info("Marking obsolete input: " + inputContext.getSourceVertexName() + " " + srcAttemptIdentifier); + LOG.info("Marking obsolete input: {} {}", inputContext.getSourceVertexName(), srcAttemptIdentifier); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 4c051d3310..646194c6d7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -584,14 +584,8 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); } - // Avoid adding attempts which have already completed - if (alreadyCompleted) { - inputIter.remove(); - continue; - } - // Avoid adding attempts which have been marked as OBSOLETE - if (isObsoleteInputAttemptIdentifier(input)) { - LOG.info("Skipping obsolete input: " + input); + // Avoid adding attempts which have already completed or have been marked as OBSOLETE + if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) { inputIter.remove(); continue; } @@ -1004,7 +998,7 @@ private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { Iterator obsoleteInputsIter = obsoletedInputs.iterator(); while (obsoleteInputsIter.hasNext()) { obsoleteInput = obsoleteInputsIter.next(); - if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) { + if (input.includes(obsoleteInput)) { return true; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 8b793b4788..3fc7d63059 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1183,7 +1183,7 @@ private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { Iterator obsoleteInputsIter = obsoleteInputs.iterator(); while (obsoleteInputsIter.hasNext()) { obsoleteInput = obsoleteInputsIter.next(); - if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) { + if (input.includes(obsoleteInput)) { return true; } } From 094e4f50864df65118c518778f41773e7c1b28b8 Mon Sep 17 00:00:00 2001 From: seonggon Date: Wed, 25 Dec 2024 14:22:34 +0900 Subject: [PATCH 3/3] Add a unit test for InputAttemptIdentifier.incldues() --- .../library/common/TestInputIdentifiers.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java index 6b82a9d27d..5eb3b5030a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java @@ -41,4 +41,23 @@ public void testInputAttemptIdentifier() { Assert.assertTrue(set.add(i4)); } + @Test(timeout = 5000) + public void testInputAttemptIdentifierIncludes() { + InputAttemptIdentifier inputData0Attempt0 = new InputAttemptIdentifier(0, 0); + InputAttemptIdentifier inputData1Attempt0 = new InputAttemptIdentifier(1, 0); + InputAttemptIdentifier inputData2Attempt0 = new InputAttemptIdentifier(2, 0); + InputAttemptIdentifier inputData3Attempt0 = new InputAttemptIdentifier(3, 0); + InputAttemptIdentifier inputData1Attempt1 = new InputAttemptIdentifier(1, 1); + CompositeInputAttemptIdentifier inputData12Attempt0 = new CompositeInputAttemptIdentifier(1, 0, null, 2); + + Assert.assertTrue(inputData1Attempt0.includes(inputData1Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData1Attempt1)); + + Assert.assertFalse(inputData12Attempt0.includes(inputData0Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData1Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData3Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData1Attempt1)); + } }