From e129d6741e282a0feba40735bff21c628be13b7a Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 10 Apr 2020 08:26:43 +0200 Subject: [PATCH 1/2] TEZ-4338: Tez should consider node information to realize OUTPUT_LOST as early as possible - upstream(mapper) problems Change-Id: I8907507b0527a65e6492fb7d83ff55b0f17bd7a4 --- .../apache/tez/dag/api/TezConfiguration.java | 15 +++ .../api/events/InputReadErrorEvent.java | 29 ++++-- tez-api/src/main/proto/Events.proto | 1 + .../org/apache/tez/dag/app/dag/Vertex.java | 5 + .../event/TaskAttemptEventOutputFailed.java | 4 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 15 ++- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 83 ++++++++++------ .../tez/dag/app/dag/impl/VertexImpl.java | 27 ++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 64 ++++++++++++- .../apache/tez/runtime/api/impl/TezEvent.java | 4 +- .../library/api/TezRuntimeConfiguration.java | 26 ++++- .../library/common/shuffle/Fetcher.java | 33 +++++-- .../shuffle/FetcherErrorTestingConfig.java | 95 +++++++++++++++++++ .../shuffle/FetcherWithInjectableErrors.java | 85 +++++++++++++++++ .../common/shuffle/impl/ShuffleManager.java | 14 ++- .../orderedgrouped/FetcherOrderedGrouped.java | 13 ++- ...herOrderedGroupedWithInjectableErrors.java | 66 +++++++++++++ .../orderedgrouped/ShuffleScheduler.java | 30 ++++-- 18 files changed, 533 insertions(+), 76 deletions(-) create mode 100644 tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java create mode 100644 tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java create mode 100644 tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 17a826e7e7..2ae1887682 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -300,6 +300,21 @@ public TezConfiguration(boolean loadDefaults) { TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error"; public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300; + /** + * int value. The maximum number of distinct downstream hosts that can report a fetch failure + * for a single upstream host before the upstream task attempt is marked as failed (so blamed for + * the fetch failure). E.g. if this set to 1, in case of 2 different hosts reporting fetch failure + * for the same upstream host the upstream task is immediately blamed for the fetch failure. + * TODO: could this be proportional to the number of hosts running consumer/downstream tasks ? + * + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE = + TEZ_AM_PREFIX + "max.allowed.downstream.hosts.reporting.fetch.failure"; + public static final int TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE_DEFAULT = 1; + /** * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an * output specific operation and typically involves making the output visible for consumption. diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java index 8ef50ebac4..1d0c44d6c1 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java @@ -62,8 +62,13 @@ public final class InputReadErrorEvent extends Event { */ private final boolean isDiskErrorAtSource; + /** + * The localhostName of the destination task attempt. + */ + private final String destinationLocalhostName; + private InputReadErrorEvent(final String diagnostics, final int index, final int version, - final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) { + final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource, String destinationLocalhostName) { super(); this.diagnostics = diagnostics; this.index = index; @@ -71,24 +76,30 @@ private InputReadErrorEvent(final String diagnostics, final int index, final int this.numFailures = numFailures; this.isLocalFetch = isLocalFetch; this.isDiskErrorAtSource = isDiskErrorAtSource; + this.destinationLocalhostName = destinationLocalhostName; } public static InputReadErrorEvent create(String diagnostics, int index, int version, boolean isLocalFetch, boolean isDiskErrorAtSource) { - return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource); + return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource, null); } public static InputReadErrorEvent create(String diagnostics, int index, int version) { - return create(diagnostics, index, version, 1, false, false); + return create(diagnostics, index, version, 1, false, false, null); + } + + public static InputReadErrorEvent create(String diagnostics, int index, int version, boolean isLocalFetch, + boolean isDiskErrorAtSource, String destinationLocalhostName) { + return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource, destinationLocalhostName); } /** * Create an InputReadErrorEvent. */ - public static InputReadErrorEvent create(final String diagnostics, final int index, - final int version, final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) { - return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch, - isDiskErrorAtSource); + public static InputReadErrorEvent create(final String diagnostics, final int index, final int version, + final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource, String destinationLocalhostName) { + return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch, isDiskErrorAtSource, + destinationLocalhostName); } public String getDiagnostics() { @@ -118,6 +129,10 @@ public boolean isDiskErrorAtSource() { return isDiskErrorAtSource; } + public String getDestinationLocalhostName(){ + return destinationLocalhostName; + } + @Override public int hashCode() { return Objects.hash(index, version); diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index e041c33f60..9949b0bc8c 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -41,6 +41,7 @@ message InputReadErrorEventProto { optional int32 version = 3; optional bool is_local_fetch = 4; optional bool is_disk_error_at_source = 5; + optional string destination_localhost_name = 6; } message InputFailedEventProto { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index ba3079d4e1..c03b6486a3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -230,8 +230,13 @@ interface VertexConfig { * @return tez.am.max.allowed.time-sec.for-read-error. */ int getMaxAllowedTimeForTaskReadErrorSec(); + /** + * @return tez.am.max.allowed.downstream.hosts.reporting.fetch.failure. + */ + int getMaxAllowedDownstreamHostsReportingFetchFailure(); } void incrementRejectedTaskAttemptCount(); int getRejectedTaskAttemptCount(); + Map> getDownstreamBlamingHosts(); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java index 6bc110a1a0..fbdd2305c7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java @@ -28,9 +28,9 @@ public class TaskAttemptEventOutputFailed extends TaskAttemptEvent private TezEvent inputFailedEvent; private int consumerTaskNumber; - public TaskAttemptEventOutputFailed(TezTaskAttemptID attemptId, + public TaskAttemptEventOutputFailed(TezTaskAttemptID sourceTaskAttemptId, TezEvent tezEvent, int numConsumers) { - super(attemptId, TaskAttemptEventType.TA_OUTPUT_FAILED); + super(sourceTaskAttemptId, TaskAttemptEventType.TA_OUTPUT_FAILED); this.inputFailedEvent = tezEvent; this.consumerTaskNumber = numConsumers; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 848b49199d..99b56fbf07 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -374,7 +374,7 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept if (!bufferEvents.get()) { switch (tezEvent.getEventType()) { case INPUT_READ_ERROR_EVENT: - InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent(); + InputReadErrorEvent inputReadErrorEvent = (InputReadErrorEvent) tezEvent.getEvent(); TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo() .getTaskAttemptID(); int destTaskIndex = destAttemptId.getTaskID().getId(); @@ -383,10 +383,10 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept try { if (onDemandRouting) { srcTaskIndex = ((EdgeManagerPluginOnDemand) edgeManager).routeInputErrorEventToSource( - destTaskIndex, event.getIndex()); + destTaskIndex, inputReadErrorEvent.getIndex()); } else { - srcTaskIndex = edgeManager.routeInputErrorEventToSource(event, - destTaskIndex, event.getIndex()); + srcTaskIndex = edgeManager.routeInputErrorEventToSource(inputReadErrorEvent, + destTaskIndex, inputReadErrorEvent.getIndex()); } Preconditions.checkArgument(srcTaskIndex >= 0, "SourceTaskIndex should not be negative," @@ -414,11 +414,10 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept " edgeManager=" + edgeManager.getClass().getName()); } TezTaskID srcTaskId = srcTask.getTaskId(); - int taskAttemptIndex = event.getVersion(); + int srcTaskAttemptIndex = inputReadErrorEvent.getVersion(); TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance(srcTaskId, - taskAttemptIndex); - sendEvent(new TaskAttemptEventOutputFailed(srcTaskAttemptId, - tezEvent, numConsumers)); + srcTaskAttemptIndex); + sendEvent(new TaskAttemptEventOutputFailed(srcTaskAttemptId, tezEvent, numConsumers)); break; default: throw new TezUncheckedException("Unhandled tez event type: " diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 593ea6cabf..f215d8757f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -194,7 +194,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private Container container; private long allocationTime; private ContainerId containerId; - private NodeId containerNodeId; + protected NodeId containerNodeId; private String nodeHttpAddress; private String nodeRackName; @@ -1793,80 +1793,107 @@ protected static class OutputReportedFailedTransition implements MultipleArcTransition { @Override - public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, + public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt, TaskAttemptEvent event) { TaskAttemptEventOutputFailed outputFailedEvent = (TaskAttemptEventOutputFailed) event; - TezEvent tezEvent = outputFailedEvent.getInputFailedEvent(); - TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID(); - InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent(); + TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent(); + TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID(); + + InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent(); int failedInputIndexOnDestTa = readErrorEvent.getIndex(); - if (readErrorEvent.getVersion() != attempt.getID().getId()) { - throw new TezUncheckedException(attempt.getID() + + if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) { + throw new TezUncheckedException(sourceAttempt.getID() + " incorrectly blamed for read error from " + failedDestTaId + " at inputIndex " + failedInputIndexOnDestTa + " version" + readErrorEvent.getVersion()); } - LOG.info(attempt.getID() - + " blamed for read error from " + failedDestTaId - + " at inputIndex " + failedInputIndexOnDestTa); - long time = attempt.clock.getTime(); - Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId); + // source host: where the data input is supposed to come from + String sHost = sourceAttempt.getNodeId().getHost(); + // destination: where the data is tried to be fetched to + String dHost = readErrorEvent.getDestinationLocalhostName(); + + LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getID(), + sHost, failedDestTaId, dHost, failedInputIndexOnDestTa); + + boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false; + Map> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts(); + if (!downstreamBlamingHosts.containsKey(sHost)) { + LOG.info("Host {} is blamed for fetch failure from {} for the first time", sHost, dHost); + downstreamBlamingHosts.put(sHost, new HashSet()); + } + downstreamBlamingHosts.get(sHost).add(dHost); + int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size(); + if (currentNumberOfFailingDownstreamHosts > sourceAttempt.getVertex().getVertexConfig() + .getMaxAllowedDownstreamHostsReportingFetchFailure()) { + LOG.info("Host will be marked fail: {} because of {} distinct upstream hosts having fetch failures", sHost, + currentNumberOfFailingDownstreamHosts); + tooManyDownstreamHostsBlamedTheSameUpstreamHost = true; + } + + long time = sourceAttempt.clock.getTime(); + + Long firstErrReportTime = sourceAttempt.uniquefailedOutputReports.get(failedDestTaId); if (firstErrReportTime == null) { - attempt.uniquefailedOutputReports.put(failedDestTaId, time); + sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time); firstErrReportTime = time; } - int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig() + int maxAllowedOutputFailures = sourceAttempt.getVertex().getVertexConfig() .getMaxAllowedOutputFailures(); - int maxAllowedTimeForTaskReadErrorSec = attempt.getVertex() + int maxAllowedTimeForTaskReadErrorSec = sourceAttempt.getVertex() .getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec(); - double maxAllowedOutputFailuresFraction = attempt.getVertex() + double maxAllowedOutputFailuresFraction = sourceAttempt.getVertex() .getVertexConfig().getMaxAllowedOutputFailuresFraction(); int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000); boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec; - int runningTasks = attempt.appContext.getCurrentDAG().getVertex( + int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex( failedDestTaId.getTaskID().getVertexID()).getRunningTasks(); - float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0; + float failureFraction = + runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0; boolean withinFailureFractionLimits = (failureFraction <= maxAllowedOutputFailuresFraction); boolean withinOutputFailureLimits = - (attempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures); + (sourceAttempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures); // If needed we can launch a background task without failing this task // to generate a copy of the output just in case. // If needed we can consider only running consumer tasks if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits - && !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())) { - return attempt.getInternalState(); + && !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource()) + && !tooManyDownstreamHostsBlamedTheSameUpstreamHost) { + return sourceAttempt.getInternalState(); } - String message = attempt.getID() + " being failed for too many output errors. " + String message = sourceAttempt.getID() + " being failed for too many output errors. " + "failureFraction=" + failureFraction + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + maxAllowedOutputFailuresFraction - + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + + ", uniquefailedOutputReports=" + sourceAttempt.uniquefailedOutputReports.size() + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + maxAllowedTimeForTaskReadErrorSec + + ", tooManyDownstreamHostsBlamedTheSameUpstreamHost: " + tooManyDownstreamHostsBlamedTheSameUpstreamHost + + " ("+currentNumberOfFailingDownstreamHosts+")" + ", readErrorTimespan=" + readErrorTimespanSec + ", isLocalFetch=" + readErrorEvent.isLocalFetch() + ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource(); LOG.info(message); - attempt.addDiagnosticInfo(message); + sourceAttempt.addDiagnosticInfo(message); // send input failed event - attempt.sendInputFailedToConsumers(); + sourceAttempt.sendInputFailedToConsumers(); // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks. - if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) { + if (sourceAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) { (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition( - attempt, event); + sourceAttempt, event); return TaskAttemptStateInternal.FAILED; } else { (new TerminatedWhileRunningTransition(FAILED_HELPER)).transition( - attempt, event); + sourceAttempt, event); return TaskAttemptStateInternal.FAIL_IN_PROGRESS; } // TODO at some point. Nodes may be interested in FetchFailure info. diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 912339e15e..a40166a321 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -264,6 +264,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl final ServicePluginInfo servicePluginInfo; + /* + * For every upstream host (as map keys) contains every unique downstream hostnames that reported INPUT_READ_ERROR. + * This map helps to decide if there is a problem with the host that produced the map outputs. There is an assumption + * that if multiple downstream hosts report input errors for the same upstream host, then it's likely that the output + * has to be blamed and needs to rerun. + */ + private final Map> downstreamBlamingHosts = Maps.newHashMap(); private final float maxFailuresPercent; private boolean logSuccessDiagnostics = false; @@ -4833,6 +4840,10 @@ static class VertexConfigImpl implements VertexConfig { * See tez.am.max.allowed.time-sec.for-read-error. */ private final int maxAllowedTimeForTaskReadErrorSec; + /** + * See tez.am.max.allowed.downstream.hosts.reporting.fetch.failure. + */ + private final int maxAllowedDownstreamHostsReportingFetchFailure; public VertexConfigImpl(Configuration conf) { this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, @@ -4857,6 +4868,10 @@ public VertexConfigImpl(Configuration conf) { this.maxAllowedTimeForTaskReadErrorSec = conf.getInt( TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); + + this.maxAllowedDownstreamHostsReportingFetchFailure = conf.getInt( + TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE, + TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE_DEFAULT); } @Override @@ -4899,8 +4914,20 @@ public boolean getTaskRescheduleRelaxedLocality() { @Override public int getMaxAllowedTimeForTaskReadErrorSec() { return maxAllowedTimeForTaskReadErrorSec; } + + /** + * @return maxAllowedDownstreamHostsReportingFetchFailure. + */ + @Override public int getMaxAllowedDownstreamHostsReportingFetchFailure() { + return maxAllowedDownstreamHostsReportingFetchFailure; + } } @Override public AbstractService getSpeculator() { return speculator; } + + @Override + public Map> getDownstreamBlamingHosts(){ + return downstreamBlamingHosts; + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 6862bec2ee..5096deba28 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -131,6 +131,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttempt { @@ -174,6 +176,7 @@ public void setupTest() { private void createMockVertex(Configuration conf) { mockVertex = mock(Vertex.class); + when(mockVertex.getDownstreamBlamingHosts()).thenReturn(Maps.newHashMap()); when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); when(mockVertex.getVertexConfig()).thenReturn( new VertexImpl.VertexConfigImpl(conf)); @@ -2173,11 +2176,11 @@ private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch, TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl sourceAttempt = new MockTaskAttemptImpl(taskID, 1, eventHandler, null, new Configuration(), SystemClock.getInstance(), mock(TaskHeartbeatHandler.class), appCtx, - false, null, null, false); + false, null, null, false).setNodeId(NodeId.newInstance("somehost", 0)); // the original read error event, sent by reducer task InputReadErrorEvent inputReadErrorEvent = - InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource); + InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource, null); TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class); when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class)); when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class)); @@ -2201,6 +2204,54 @@ private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch, Assert.assertEquals(expectedState, resultState); } + @Test + public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() { + EventHandler eventHandler = mock(EventHandler.class); + TezTaskID taskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); + TaskAttemptImpl sourceAttempt = new MockTaskAttemptImpl(taskID, 1, eventHandler, null, new Configuration(), + SystemClock.getInstance(), mock(TaskHeartbeatHandler.class), appCtx, false, null, null, false) + .setNodeId(NodeId.newInstance("somehost", 0)); + + // input read error events from 2 different hosts + InputReadErrorEvent inputReadErrorEvent1 = + InputReadErrorEvent.create("", 0, 1, 1, false, false, "downstream_host_1"); + InputReadErrorEvent inputReadErrorEvent2 = + InputReadErrorEvent.create("", 1, 1, 1, false, false, "downstream_host_2"); + + TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class); + when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class)); + when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class)); + when(appCtx.getCurrentDAG()).thenReturn(mock(DAG.class)); + when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class))).thenReturn(mock(Vertex.class)); + when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)).getRunningTasks()).thenReturn(100); + + EventMetaData mockMeta = mock(EventMetaData.class); + when(mockMeta.getTaskAttemptID()).thenReturn(destTaskAttemptId); + + // mapper task succeeded earlier + sourceAttempt.handle(new TaskAttemptEvent(sourceAttempt.getID(), TaskAttemptEventType.TA_DONE)); + Assert.assertEquals(TaskAttemptStateInternal.SUCCEEDED, sourceAttempt.getInternalState()); + + // the event is propagated to map task's event handler + TezEvent tezEvent = new TezEvent(inputReadErrorEvent1, mockMeta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent, 11); + TaskAttemptStateInternal resultState = + new TaskAttemptImpl.OutputReportedFailedTransition().transition(sourceAttempt, outputFailedEvent); + // SUCCEEDED, as we haven't reached the limit for blaming by the number of downstream hosts + Assert.assertEquals(TaskAttemptStateInternal.SUCCEEDED, resultState); + + // the second event is propagated to map task's event handler + TezEvent tezEvent2 = new TezEvent(inputReadErrorEvent2, mockMeta); + TaskAttemptEventOutputFailed outputFailedEvent2 = + new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent2, 11); + TaskAttemptStateInternal resultState2 = + new TaskAttemptImpl.OutputReportedFailedTransition().transition(sourceAttempt, outputFailedEvent2); + + // now it's marked as FAILED + Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2); + } + private Event verifyEventType(List events, Class eventClass, int expectedOccurences) { int count = 0; @@ -2247,9 +2298,14 @@ public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, isRescheduled, resource, containerContext, leafVertex, mockTask, locationHint, null, null); } - + boolean inputFailedReported = false; - + + public MockTaskAttemptImpl setNodeId(NodeId nodeId) { + this.containerNodeId = nodeId; + return this; + } + @Override protected Vertex getVertex() { return mockVertex; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index ebea9a4f3f..f96a437a49 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -193,6 +193,7 @@ private void serializeEvent(DataOutput out) throws IOException { .setVersion(ideEvt.getVersion()) .setIsLocalFetch(ideEvt.isLocalFetch()) .setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource()) + .setDestinationLocalhostName(ideEvt.getDestinationLocalhostName()) .build(); break; case TASK_ATTEMPT_FAILED_EVENT: @@ -298,7 +299,8 @@ private void deserializeEvent(DataInput in) throws IOException { case INPUT_READ_ERROR_EVENT: InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input); event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(), - ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource()); + ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource(), + ideProto.getDestinationLocalhostName()); break; case TASK_ATTEMPT_FAILED_EVENT: TaskAttemptFailedEventProto tfProto = diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 64e6a71d7e..ab3c3db035 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -414,6 +414,28 @@ public class TezRuntimeConfiguration { public static final float TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT = 0.90f; + /** + * Enables fetch failures by a configuration. Should be used for testing only. + */ + @ConfigurationProperty(type = "boolean") + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS = + TEZ_RUNTIME_PREFIX + "shuffle.fetch.testing.errors.enable"; + public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT = false; + + /** + * Configures the injectable fetch failures, in a form of: + * maphost#probability#comma,separated,features + * Possible values are (fetch fails...): + * "*#50": from all map hosts with 50% likelihood + * "_first_#80": for the first ever seen map host with 80% likelihood (user doesn't want to use hostnames) + * "host1#100": from host1 with 100% likelihood (simulates single node failure) + * "host1#100#fail_only_first": as above but only for input attempts with index 0 + */ + @ConfigurationProperty(type = "string") + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG = + TEZ_RUNTIME_PREFIX + "shuffle.fetch.testing.errors.config"; + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG_DEFAULT = "*#50"; + @ConfigurationProperty(type = "float") public static final String TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT = TEZ_RUNTIME_PREFIX + "shuffle.memory.limit.percent"; @@ -543,7 +565,7 @@ public class TezRuntimeConfiguration { /** - * Share data fetched between tasks running on the same host if applicable + * Share data fetched between tasks running on the same host if applicable. */ @ConfigurationProperty(type = "boolean") public static final String TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH = TEZ_RUNTIME_PREFIX @@ -626,6 +648,8 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS); tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS); tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index f295c06b8e..6039df3a2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -58,6 +58,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -144,7 +145,7 @@ public String toString() { private final AtomicBoolean isShutDown = new AtomicBoolean(false); - private final int fetcherIdentifier; + protected final int fetcherIdentifier; // Parameters to track work. private List srcAttempts; @@ -192,7 +193,7 @@ public String getHost() { private final boolean isDebugEnabled = LOG.isDebugEnabled(); - private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, + protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, @@ -527,8 +528,7 @@ private HostFetchResult setupConnection(Collection attem } try { - input = httpConnection.getInputStream(); - httpConnection.validate(); + setupConnectionInternal(host, attempts); //validateConnectionResponse(msgToEncode, encHash); } catch (IOException e) { // ioErrs.increment(1); @@ -556,6 +556,13 @@ private HostFetchResult setupConnection(Collection attem return null; } + + protected void setupConnectionInternal(String host, Collection attempts) + throws IOException, InterruptedException { + input = httpConnection.getInputStream(); + httpConnection.validate(); + } + @VisibleForTesting protected HostFetchResult doHttpFetch(CachingCallBack callback) { @@ -1141,11 +1148,19 @@ public FetcherBuilder(FetcherCallback fetcherCallback, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, - String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, - jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, - lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, - verifyDiskChecksum, compositeFetch); + String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch, + boolean enableFetcherTestingErrors, ObjectRegistry objectRegistry) { + if (enableFetcherTestingErrors) { + this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, appId, dagIdentifier, + jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, + lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, + verifyDiskChecksum, compositeFetch, objectRegistry); + } else { + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, + jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, + lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, + verifyDiskChecksum, compositeFetch); + } } public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java new file mode 100644 index 0000000000..473025a72c --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.shuffle; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FetcherErrorTestingConfig { + private static final Logger LOG = LoggerFactory.getLogger(FetcherErrorTestingConfig.class); + private static final String KEY_CACHED_HOSTNAME = "FetcherErrorTestingConfig.host"; + + private String hostToFail = "*"; + private int probabilityPercent = 50; + private Random random = new Random(); + /** + * Whether to fail only in case of input attempts with index 0, + * this prevents continuous failure, and helps simulating a real-life node failure. + */ + private boolean failForFirstAttemptOnly = false; + private ObjectRegistry objectRegistry; + + public FetcherErrorTestingConfig(Configuration conf, ObjectRegistry objectRegistry) { + String errorConfig = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG_DEFAULT); + String[] configParts = errorConfig.split("#"); + if (configParts.length > 0) { + hostToFail = configParts[0]; + } + + if (configParts.length > 1) { + probabilityPercent = Integer.parseInt(configParts[1]); + } + + if (configParts.length > 2) { + List features = Arrays.asList(configParts[2].split(",")); + if (features.contains("fail_only_first")) { + failForFirstAttemptOnly = true; + } + } + + this.objectRegistry = objectRegistry; + if (hostToFail.equals("_first_")) { + String host = (String) objectRegistry.get(KEY_CACHED_HOSTNAME); + if (host != null) { + LOG.info("Get already stored hostname for fetcher test failures: " + host); + hostToFail = host; + } + } + } + + public boolean shouldFail(String host, InputAttemptIdentifier inputAttemptIdentifier) { + if (matchHost(host)) { + return (!failForFirstAttemptOnly || failForFirstAttemptOnly && inputAttemptIdentifier.getAttemptNumber() == 0) + && random.nextInt(100) < probabilityPercent; + } + return false; + } + + private boolean matchHost(String host) { + if (hostToFail.equals("_first_")) { + objectRegistry.cacheForVertex(KEY_CACHED_HOSTNAME, host); + hostToFail = host; + } + return "*".equals(hostToFail) || host.equalsIgnoreCase(hostToFail); + } + + @Override + public String toString() { + return String.format("[FetcherErrorTestingConfig: host: %s, probability: %d%%, failForFirstAttemptOnly: %s]", + hostToFail, probabilityPercent, failForFirstAttemptOnly); + } +} diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java new file mode 100644 index 0000000000..8261f8c6be --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.shuffle; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FetcherWithInjectableErrors extends Fetcher { + private static final Logger LOG = LoggerFactory.getLogger(FetcherWithInjectableErrors.class); + + private FetcherErrorTestingConfig fetcherErrorTestingConfig; + + protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConnectionParams params, + FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, + JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, + RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, + boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, + boolean compositeFetch, ObjectRegistry objectRegistry) { + super(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretManager, srcNameTrimmed, conf, + localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, + asyncHttp, verifyDiskChecksum, compositeFetch); + this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry); + LOG.info("Initialized FetcherWithInjectableErrors with config: {}", fetcherErrorTestingConfig); + } + + @Override + protected void setupConnectionInternal(String host, Collection attempts) + throws IOException, InterruptedException { + LOG.info("Checking if fetcher should fail for host: {} ...", host); + for (InputAttemptIdentifier inputAttemptIdentifier : attempts) { + if (fetcherErrorTestingConfig.shouldFail(host, inputAttemptIdentifier)) { + throw new IOException(String.format( + "FetcherWithInjectableErrors tester made failure for host: %s, input attempt: %s", host, + inputAttemptIdentifier.getAttemptNumber())); + } + } + super.setupConnectionInternal(host, attempts); + } + + @Override + public int hashCode() { + return fetcherIdentifier; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + FetcherWithInjectableErrors other = (FetcherWithInjectableErrors) obj; + if (fetcherIdentifier != other.fetcherIdentifier) { + return false; + } + return true; + } +} diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 56195a8641..fa539c8374 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -174,7 +174,8 @@ public class ShuffleManager implements FetcherCallback { private final boolean sharedFetchEnabled; private final boolean verifyDiskChecksum; private final boolean compositeFetch; - + private final boolean enableFetcherTestingErrors; + private final int ifileBufferSize; private final boolean ifileReadAhead; private final int ifileReadAheadLength; @@ -259,6 +260,10 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); + this.enableFetcherTestingErrors = + conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT); + this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); completedInputSet = new BitSet(numInputs); @@ -395,7 +400,7 @@ protected Void callInternal() throws Exception { for (InputReadErrorEvent key : failedEvents.keySet()) { failedEventsToSend.add(InputReadErrorEvent.create(key.getDiagnostics(), key.getIndex(), key.getVersion(), failedEvents.get(key), key.isLocalFetch(), - key.isDiskErrorAtSource())); + key.isDiskErrorAtSource(), localhostName)); } inputContext.sendEvents(failedEventsToSend); failedEvents.clear(); @@ -543,7 +548,8 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, - localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); + localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch, enableFetcherTestingErrors, + inputContext.getObjectRegistry()); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); @@ -960,7 +966,7 @@ public void fetchFailed(String host, srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber(), inputAttemptFetchFailure.isLocalFetch(), - inputAttemptFetchFailure.isDiskErrorAtSource()); + inputAttemptFetchFailure.isDiskErrorAtSource(), localhostName); if (maxTimeToWaitForReportMillis > 0) { try { reportLock.lock(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index c9bd092f05..6baffe2c6b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -81,7 +81,7 @@ class FetcherOrderedGrouped extends CallableWithNdc { private final int localShufflePort; private final String applicationId; private final int dagId; - private final MapHost mapHost; + protected final MapHost mapHost; private final int minPartition; private final int maxPartition; @@ -350,8 +350,7 @@ boolean setupConnection(MapHost host, Collection attempt LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); return false; } - input = httpConnection.getInputStream(); - httpConnection.validate(); + setupConnectionInternal(host, attempts); return true; } catch (IOException | InterruptedException ie) { if (ie instanceof InterruptedException) { @@ -385,6 +384,12 @@ boolean setupConnection(MapHost host, Collection attempt } } + protected void setupConnectionInternal(MapHost host, Collection attempts) + throws IOException, InterruptedException { + input = httpConnection.getInputStream(); + httpConnection.validate(); + } + @VisibleForTesting protected void putBackRemainingMapOutputs(MapHost host) { // Cycle through remaining MapOutputs @@ -426,7 +431,7 @@ public String toString() { } protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream input, - InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException { + InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException, IOException { MapOutput mapOutput = null; InputAttemptIdentifier srcAttemptId = null; long decompressedLength = 0; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java new file mode 100644 index 0000000000..d16af4560f --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.FetcherErrorTestingConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FetcherOrderedGroupedWithInjectableErrors extends FetcherOrderedGrouped { + private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGroupedWithInjectableErrors.class); + + private FetcherErrorTestingConfig fetcherErrorTestingConfig; + + public FetcherOrderedGroupedWithInjectableErrors(HttpConnectionParams httpConnectionParams, + ShuffleScheduler scheduler, FetchedInputAllocatorOrderedGrouped allocator, ExceptionReporter exceptionReporter, + JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, + Configuration conf, RawLocalFileSystem localFs, boolean localDiskFetchEnabled, String localHostname, + int shufflePort, String srcNameTrimmed, MapHost mapHost, TezCounter ioErrsCounter, + TezCounter wrongLengthErrsCounter, TezCounter badIdErrsCounter, TezCounter wrongMapErrsCounter, + TezCounter connectionErrsCounter, TezCounter wrongReduceErrsCounter, String applicationId, int dagId, + boolean asyncHttp, boolean sslShuffle, boolean verifyDiskChecksum, boolean compositeFetch, + ObjectRegistry objectRegistry) { + super(httpConnectionParams, scheduler, allocator, exceptionReporter, jobTokenSecretMgr, ifileReadAhead, + ifileReadAheadLength, codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, + mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, + wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, verifyDiskChecksum, compositeFetch); + this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry); + LOG.info("Initialized FetcherOrderedGroupedWithInjectableErrors with config: {}", fetcherErrorTestingConfig); + } + + @Override + protected void setupConnectionInternal(MapHost host, Collection attempts) + throws IOException, InterruptedException { + LOG.info("Checking if fetcher should fail for host: {} ...", mapHost.getHost()); + for (InputAttemptIdentifier inputAttemptIdentifier : attempts) { + if (fetcherErrorTestingConfig.shouldFail(mapHost.getHost(), inputAttemptIdentifier)) { + throw new IOException(String.format( + "FetcherOrderedGroupedWithInjectableErrors tester made failure for host: %s, input attempt: %s", + mapHost.getHost(), inputAttemptIdentifier.getAttemptNumber())); + } + } + super.setupConnectionInternal(host, attempts); + } +} diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 540d44f409..9ae2972d2e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -247,7 +247,7 @@ enum ShuffleErrors { private final boolean checkFailedFetchSinceLastCompletion; private final boolean verifyDiskChecksum; private final boolean compositeFetch; - + private final boolean enableFetcherTestingErrors; private volatile Thread shuffleSchedulerThread = null; private final int maxPenaltyTime; @@ -426,6 +426,10 @@ public ShuffleScheduler(InputContext inputContext, this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT); + this.enableFetcherTestingErrors = + conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT); + pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " @@ -442,6 +446,7 @@ public ShuffleScheduler(InputContext inputContext, + ", minReqProgressFraction=" + minReqProgressFraction + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion + ", asyncHttp=" + asyncHttp + + ", enableFetcherTestingErrors=" + enableFetcherTestingErrors ); } @@ -884,7 +889,7 @@ private void informAM(InputAttemptFetchFailure fetchFailure) { srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber(), - fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource())); + fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource(), localHostname)); inputContext.sendEvents(failedEvents); } @@ -1464,12 +1469,21 @@ private synchronized void waitAndNotifyProgress() throws InterruptedException { @VisibleForTesting FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { - return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, - exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, - codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, - ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, - verifyDiskChecksum, compositeFetch); + if (enableFetcherTestingErrors) { + return new FetcherOrderedGroupedWithInjectableErrors(httpConnectionParams, ShuffleScheduler.this, allocator, + exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, + codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, + ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, + connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, + verifyDiskChecksum, compositeFetch, inputContext.getObjectRegistry()); + } else { + return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, + exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, + codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, + ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, + connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, + verifyDiskChecksum, compositeFetch); + } } private class FetchFutureCallback implements FutureCallback { From c523ebba8f14c1256cb992109c18ce023fde575c Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Wed, 20 Oct 2021 14:38:37 +0200 Subject: [PATCH 2/2] TEZ-4338: host fraction considering active nodes, src vertex filter at injectable errors --- .../apache/tez/dag/api/TezConfiguration.java | 19 +++++++----- .../api/ContainerLauncherContext.java | 2 +- .../org/apache/tez/dag/app/dag/Vertex.java | 4 +-- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 30 +++++++++++++++---- .../tez/dag/app/dag/impl/VertexImpl.java | 14 ++++----- .../tez/dag/app/rm/node/AMNodeImpl.java | 6 ++++ .../tez/dag/app/rm/node/AMNodeTracker.java | 13 ++++++-- .../dag/app/rm/node/PerSourceNodeTracker.java | 4 +++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 15 ++++++++-- .../library/api/TezRuntimeConfiguration.java | 11 +++---- .../shuffle/FetcherErrorTestingConfig.java | 28 +++++++++++++---- .../shuffle/FetcherWithInjectableErrors.java | 4 ++- ...herOrderedGroupedWithInjectableErrors.java | 4 ++- 13 files changed, 112 insertions(+), 42 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 2ae1887682..5063b04eac 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -301,19 +301,22 @@ public TezConfiguration(boolean loadDefaults) { public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300; /** - * int value. The maximum number of distinct downstream hosts that can report a fetch failure - * for a single upstream host before the upstream task attempt is marked as failed (so blamed for - * the fetch failure). E.g. if this set to 1, in case of 2 different hosts reporting fetch failure - * for the same upstream host the upstream task is immediately blamed for the fetch failure. - * TODO: could this be proportional to the number of hosts running consumer/downstream tasks ? + * Double value. Assuming that a certain number of downstream hosts reported fetch failure for a + * given upstream host, this config drives the max allowed ratio of (downstream hosts) / (all hosts). + * The total number of used hosts are tracked by AMNodeTracker, which divides the distinct number of + * downstream hosts blaming source(upstream) tasks in a given vertex. If the fraction is beyond this + * limit, the upstream task attempt is marked as failed (so blamed for the fetch failure). + * E.g. if this set to 0.2, in case of 3 different hosts reporting fetch failure + * for the same upstream host in a cluster which currently utilizes 10 nodes, the upstream task + * is immediately blamed for the fetch failure. * * Expert level setting. */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") - public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE = - TEZ_AM_PREFIX + "max.allowed.downstream.hosts.reporting.fetch.failure"; - public static final int TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE_DEFAULT = 1; + public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION = + TEZ_AM_PREFIX + "max.allowed.downstream.host.failures.fraction"; + public static final double TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT = 0.2; /** * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java index ed1d58f78a..16d54f05b7 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -80,7 +80,7 @@ void containerCompleted(ContainerId containerId, int exitStatus, String diagnost * Get the number of nodes being handled by the specified source * * @param sourceName the relevant source name - * @return the initial payload + * @return the number of nodes */ int getNumNodes(String sourceName); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index c03b6486a3..ff83e19a85 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -231,9 +231,9 @@ interface VertexConfig { */ int getMaxAllowedTimeForTaskReadErrorSec(); /** - * @return tez.am.max.allowed.downstream.hosts.reporting.fetch.failure. + * @return tez.am.max.allowed.downstream.host.failures.fraction. */ - int getMaxAllowedDownstreamHostsReportingFetchFailure(); + double getMaxAllowedDownstreamHostFailuresFraction(); } void incrementRejectedTaskAttemptCount(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index f215d8757f..e6b9e82000 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1826,10 +1826,14 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt, downstreamBlamingHosts.get(sHost).add(dHost); int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size(); - if (currentNumberOfFailingDownstreamHosts > sourceAttempt.getVertex().getVertexConfig() - .getMaxAllowedDownstreamHostsReportingFetchFailure()) { - LOG.info("Host will be marked fail: {} because of {} distinct upstream hosts having fetch failures", sHost, - currentNumberOfFailingDownstreamHosts); + int numNodes = getNumNodes(sourceAttempt); + float hostFailureFraction = numNodes > 0 ? ((float) currentNumberOfFailingDownstreamHosts) / numNodes : 0; + double maxAllowedHostFailureFraction = sourceAttempt.getVertex().getVertexConfig() + .getMaxAllowedDownstreamHostFailuresFraction(); + + if (hostFailureFraction > maxAllowedHostFailureFraction) { + LOG.info("Host will be marked fail: {} because of host failure fraction {} is beyond the limit {}", sHost, + hostFailureFraction, maxAllowedHostFailureFraction); tooManyDownstreamHostsBlamedTheSameUpstreamHost = true; } @@ -1874,10 +1878,12 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt, + maxAllowedOutputFailuresFraction + ", uniquefailedOutputReports=" + sourceAttempt.uniquefailedOutputReports.size() + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures + + ", hostFailureFraction=" + hostFailureFraction + + " (" + currentNumberOfFailingDownstreamHosts + " / " + numNodes + ")" + + ", MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION=" + + maxAllowedHostFailureFraction + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + maxAllowedTimeForTaskReadErrorSec - + ", tooManyDownstreamHostsBlamedTheSameUpstreamHost: " + tooManyDownstreamHostsBlamedTheSameUpstreamHost - + " ("+currentNumberOfFailingDownstreamHosts+")" + ", readErrorTimespan=" + readErrorTimespanSec + ", isLocalFetch=" + readErrorEvent.isLocalFetch() + ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource(); @@ -1899,6 +1905,18 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt, // TODO at some point. Nodes may be interested in FetchFailure info. // Can be used to blacklist nodes. } + + private int getNumNodes(TaskAttemptImpl sourceAttempt) { + Vertex vertex = sourceAttempt.getVertex(); + String taskSchedulerName = vertex.getServicePluginInfo().getTaskSchedulerName(); + int sourceIndex = vertex.getAppContext().getTaskScheduerIdentifier(taskSchedulerName); + int numActiveNodes = vertex.getAppContext().getNodeTracker().getNumActiveNodes(sourceIndex); + if (LOG.isDebugEnabled()) { + int numAllNodes = vertex.getAppContext().getNodeTracker().getNumNodes(sourceIndex); + LOG.debug("Getting nodes, active/all: {}/{}", numActiveNodes, numAllNodes); + } + return numActiveNodes; + } } @VisibleForTesting diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index a40166a321..f22f6de250 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4841,9 +4841,9 @@ static class VertexConfigImpl implements VertexConfig { */ private final int maxAllowedTimeForTaskReadErrorSec; /** - * See tez.am.max.allowed.downstream.hosts.reporting.fetch.failure. + * See tez.am.max.allowed.downstream.host.failures.fraction. */ - private final int maxAllowedDownstreamHostsReportingFetchFailure; + private final double maxAllowedDownstreamHostFailuresFraction; public VertexConfigImpl(Configuration conf) { this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, @@ -4869,9 +4869,9 @@ public VertexConfigImpl(Configuration conf) { TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); - this.maxAllowedDownstreamHostsReportingFetchFailure = conf.getInt( - TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE, - TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE_DEFAULT); + this.maxAllowedDownstreamHostFailuresFraction = conf.getDouble( + TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION, + TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT); } @Override @@ -4918,8 +4918,8 @@ public boolean getTaskRescheduleRelaxedLocality() { /** * @return maxAllowedDownstreamHostsReportingFetchFailure. */ - @Override public int getMaxAllowedDownstreamHostsReportingFetchFailure() { - return maxAllowedDownstreamHostsReportingFetchFailure; + @Override public double getMaxAllowedDownstreamHostFailuresFraction() { + return maxAllowedDownstreamHostFailuresFraction; } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index df19534bfc..26796d2af4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -509,4 +509,10 @@ public void dagComplete(DAG dag) { this.writeLock.unlock(); } } + + public String toString() { + return String.format( + "{AMNodeImpl: nodeId: %s, state: %s, containers: %d, completed containers: %d, healthy: %s, blackListed: %s}", + nodeId, getState(), getContainers().size(), completedContainers.size(), !isUnhealthy(), isBlacklisted()); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 1536170fac..8c81cb52c9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -134,6 +134,17 @@ public int getNumNodes(int schedulerId) { return perSourceNodeTrackers.get(schedulerId).getNumNodes(); } + /** + * Retrieve the number of nodes in ACTIVE state. This number is suitable for deciding + * how many nodes can be potentially used for running containers at the moment. + * + * @param schedulerId the schedulerId for which the node count is required + * @return the number of nodes from the scheduler being in ACTIVE state + */ + public int getNumActiveNodes(int schedulerId) { + return perSourceNodeTrackers.get(schedulerId).getNumActiveNodes(); + } + @Private @VisibleForTesting public boolean isBlacklistingIgnored(int schedulerId) { @@ -158,6 +169,4 @@ private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerI } return nodeTracker; } - - } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java index 74c6176e4c..9906644fe1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java @@ -84,6 +84,10 @@ public int getNumNodes() { return nodeMap.size(); } + public int getNumActiveNodes() { + return (int) nodeMap.values().stream().filter(node -> node.getState() == AMNodeState.ACTIVE).count(); + } + public void handle(AMNodeEvent rEvent) { // No synchronization required until there's multiple dispatchers. NodeId nodeId = rEvent.getNodeId(); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 5096deba28..7a2a05fb26 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -108,6 +108,7 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; +import org.apache.tez.dag.app.rm.node.AMNodeTracker; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -178,8 +179,14 @@ private void createMockVertex(Configuration conf) { mockVertex = mock(Vertex.class); when(mockVertex.getDownstreamBlamingHosts()).thenReturn(Maps.newHashMap()); when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); - when(mockVertex.getVertexConfig()).thenReturn( - new VertexImpl.VertexConfigImpl(conf)); + when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf)); + AppContext appContext = mock(AppContext.class); + when(appContext.getTaskScheduerIdentifier(Mockito.anyString())).thenReturn(0); + when(mockVertex.getAppContext()).thenReturn(appContext); + AMNodeTracker nodeTracker = mock(AMNodeTracker.class); + when(nodeTracker.getNumNodes(Mockito.anyInt())).thenReturn(10); + when(nodeTracker.getNumActiveNodes(Mockito.anyInt())).thenReturn(8); + when(appContext.getNodeTracker()).thenReturn(nodeTracker); } @Test(timeout = 5000) @@ -2238,7 +2245,8 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() { new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent, 11); TaskAttemptStateInternal resultState = new TaskAttemptImpl.OutputReportedFailedTransition().transition(sourceAttempt, outputFailedEvent); - // SUCCEEDED, as we haven't reached the limit for blaming by the number of downstream hosts + // SUCCEEDED, as we haven't reached the host limit fraction + // active nodes: 8, failed hosts: 1, fraction 0.125 (< 0.2) Assert.assertEquals(TaskAttemptStateInternal.SUCCEEDED, resultState); // the second event is propagated to map task's event handler @@ -2249,6 +2257,7 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() { new TaskAttemptImpl.OutputReportedFailedTransition().transition(sourceAttempt, outputFailedEvent2); // now it's marked as FAILED + // active nodes: 8, failed hosts: 2, fraction 0.25 (> 0.2) Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index ab3c3db035..1c747af2bb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -424,12 +424,13 @@ public class TezRuntimeConfiguration { /** * Configures the injectable fetch failures, in a form of: - * maphost#probability#comma,separated,features + * maphost#mapvertex#probability#comma,separated,features * Possible values are (fetch fails...): - * "*#50": from all map hosts with 50% likelihood - * "_first_#80": for the first ever seen map host with 80% likelihood (user doesn't want to use hostnames) - * "host1#100": from host1 with 100% likelihood (simulates single node failure) - * "host1#100#fail_only_first": as above but only for input attempts with index 0 + * "*#*#50": from all map hosts with 50% likelihood + * "_first_#*#80": for the first ever seen map host with 80% likelihood (user doesn't want to use hostnames) + * "host1#*#100": from host1 with 100% likelihood (simulates single node failure) + * "host1#Map_1#100": from host1 for Map 1 source tasks with 100% likelihood + * "host1#Map_1#100#fail_only_first": as above but only for input attempts with index 0 */ @ConfigurationProperty(type = "string") public static final String TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG = diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java index 473025a72c..ce15a87fb9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherErrorTestingConfig.java @@ -22,6 +22,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -33,6 +34,7 @@ public class FetcherErrorTestingConfig { private static final String KEY_CACHED_HOSTNAME = "FetcherErrorTestingConfig.host"; private String hostToFail = "*"; + private String srcNameTrimmedToFail = "*"; private int probabilityPercent = 50; private Random random = new Random(); /** @@ -46,16 +48,25 @@ public FetcherErrorTestingConfig(Configuration conf, ObjectRegistry objectRegist String errorConfig = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG_DEFAULT); String[] configParts = errorConfig.split("#"); + + // e.g. host_1 if (configParts.length > 0) { hostToFail = configParts[0]; } + // e.g. Map 1 or Map_1, both will work if (configParts.length > 1) { - probabilityPercent = Integer.parseInt(configParts[1]); + srcNameTrimmedToFail = TezUtilsInternal.cleanVertexName(configParts[1]); } + // e.g. 50 if (configParts.length > 2) { - List features = Arrays.asList(configParts[2].split(",")); + probabilityPercent = Integer.parseInt(configParts[2]); + } + + // e.g. fail_only_first + if (configParts.length > 3) { + List features = Arrays.asList(configParts[3].split(",")); if (features.contains("fail_only_first")) { failForFirstAttemptOnly = true; } @@ -71,8 +82,8 @@ public FetcherErrorTestingConfig(Configuration conf, ObjectRegistry objectRegist } } - public boolean shouldFail(String host, InputAttemptIdentifier inputAttemptIdentifier) { - if (matchHost(host)) { + public boolean shouldFail(String host, String srcNameTrimmed, InputAttemptIdentifier inputAttemptIdentifier) { + if (matchHost(host) && matchSourceVertex(srcNameTrimmed)) { return (!failForFirstAttemptOnly || failForFirstAttemptOnly && inputAttemptIdentifier.getAttemptNumber() == 0) && random.nextInt(100) < probabilityPercent; } @@ -87,9 +98,14 @@ private boolean matchHost(String host) { return "*".equals(hostToFail) || host.equalsIgnoreCase(hostToFail); } + private boolean matchSourceVertex(String srcNameTrimmed) { + return "*".equals(srcNameTrimmedToFail) || srcNameTrimmed.equalsIgnoreCase(srcNameTrimmedToFail); + } + @Override public String toString() { - return String.format("[FetcherErrorTestingConfig: host: %s, probability: %d%%, failForFirstAttemptOnly: %s]", - hostToFail, probabilityPercent, failForFirstAttemptOnly); + return String.format( + "[FetcherErrorTestingConfig: host: %s, source vertex: %s, probability: %d%%, failForFirstAttemptOnly: %s]", + hostToFail, srcNameTrimmedToFail, probabilityPercent, failForFirstAttemptOnly); } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java index 8261f8c6be..951adf9f5d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java @@ -32,6 +32,7 @@ public class FetcherWithInjectableErrors extends Fetcher { private static final Logger LOG = LoggerFactory.getLogger(FetcherWithInjectableErrors.class); private FetcherErrorTestingConfig fetcherErrorTestingConfig; + private String srcNameTrimmed; protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, @@ -43,6 +44,7 @@ protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConne localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry); + this.srcNameTrimmed = srcNameTrimmed; LOG.info("Initialized FetcherWithInjectableErrors with config: {}", fetcherErrorTestingConfig); } @@ -51,7 +53,7 @@ protected void setupConnectionInternal(String host, Collection