diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java index 9ce1b10aa0..438d5f0cd4 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java @@ -27,6 +27,7 @@ import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import org.apache.hadoop.yarn.api.records.ApplicationId; /** * TezTaskAttemptID represents the immutable and unique identifier for @@ -76,6 +77,18 @@ private TezTaskAttemptID(TezTaskID taskId, int id) { public TezTaskID getTaskID() { return taskId; } + + public TezVertexID getVertexID() { + return taskId.getVertexID(); + } + + public TezDAGID getDAGId() { + return taskId.getDAGId(); + } + + public ApplicationId getApplicationId() { + return taskId.getApplicationId(); + } @Override public boolean equals(Object o) { @@ -162,5 +175,4 @@ public static TezTaskAttemptID fromString(String taIdStr) { } return null; } - } diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java index 993df7c87a..cf56ceedcc 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.Preconditions; import org.apache.tez.util.FastNumberFormat; @@ -83,6 +84,14 @@ public TezVertexID getVertexID() { return vertexId; } + public TezDAGID getDAGId() { + return vertexId.getDAGId(); + } + + public ApplicationId getApplicationId() { + return vertexId.getApplicationId(); + } + @Override public boolean equals(Object o) { if (!super.equals(o)) diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java index 5ecfca6a49..5c2baa4c65 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.Preconditions; import org.apache.tez.util.FastNumberFormat; @@ -83,6 +84,11 @@ public TezDAGID getDAGId() { return dagId; } + + public ApplicationId getApplicationId() { + return dagId.getApplicationId(); + } + @Override public boolean equals(Object o) { if (!super.equals(o)) @@ -158,5 +164,4 @@ public static TezVertexID fromString(String vertexIdStr) { } return null; } - } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index abc10bd86c..4f3c5c917c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2187,12 +2187,12 @@ private class TaskEventDispatcher implements EventHandler { public void handle(TaskEvent event) { DAG dag = context.getCurrentDAG(); int eventDagIndex = - event.getTaskID().getVertexID().getDAGId().getId(); + event.getDAGId().getId(); if (dag == null || eventDagIndex != dag.getID().getId()) { return; // event not relevant any more } Task task = - dag.getVertex(event.getTaskID().getVertexID()). + dag.getVertex(event.getVertexID()). getTask(event.getTaskID()); ((EventHandler)task).handle(event); } @@ -2217,13 +2217,13 @@ private class TaskAttemptEventDispatcher public void handle(TaskAttemptEvent event) { DAG dag = context.getCurrentDAG(); int eventDagIndex = - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); + event.getDAGId().getId(); if (dag == null || eventDagIndex != dag.getID().getId()) { return; // event not relevant any more } Task task = - dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()). - getTask(event.getTaskAttemptID().getTaskID()); + dag.getVertex(event.getVertexID()). + getTask(event.getTaskID()); TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID()); ((EventHandler) attempt).handle(event); } @@ -2236,7 +2236,7 @@ private class VertexEventDispatcher public void handle(VertexEvent event) { DAG dag = context.getCurrentDAG(); int eventDagIndex = - event.getVertexId().getDAGId().getId(); + event.getDAGId().getId(); if (dag == null || eventDagIndex != dag.getID().getId()) { return; // event not relevant any more } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 19c24f300c..ce3b62bbd5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -846,9 +846,9 @@ public DAGRecoveryData parseRecoveryData() throws IOException { case TASK_STARTED: { TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event; - VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID()); + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID()); Preconditions.checkArgument(vertexRecoveryData != null, - "Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID()); + "Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID()); TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID()); taskRecoveryData.taskStartedEvent = taskStartedEvent; break; @@ -856,9 +856,9 @@ public DAGRecoveryData parseRecoveryData() throws IOException { case TASK_FINISHED: { TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event; - VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID()); + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID()); Preconditions.checkArgument(vertexRecoveryData != null, - "Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID()); + "Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID()); TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID()); taskRecoveryData.taskFinishedEvent = taskFinishedEvent; break; @@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException { { TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event; VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get( - taStartedEvent.getTaskAttemptID().getTaskID().getVertexID()); + taStartedEvent.getVertexID()); Preconditions.checkArgument(vertexRecoveryData != null, "Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID()); TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap @@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException { { TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event; VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get( - taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID()); + taFinishedEvent.getVertexID()); Preconditions.checkArgument(vertexRecoveryData != null, "Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID()); TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index ac2f7605ae..51895f4afd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) } } if (!eventsForVertex.isEmpty()) { - TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); + TezVertexID vertexId = taskAttemptID.getVertexID(); sendEvent( new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex))); } taskHeartbeatHandler.pinged(taskAttemptID); eventInfo = context .getCurrentDAG() - .getVertex(taskAttemptID.getTaskID().getVertexID()) + .getVertex(taskAttemptID.getVertexID()) .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(), request.getMaxEvents()); } @@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { DAG job = context.getCurrentDAG(); Task task = - job.getVertex(taskAttemptId.getTaskID().getVertexID()). + job.getVertex(taskAttemptId.getVertexID()). getTask(taskAttemptId.getTaskID()); return task.canCommit(taskAttemptId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java index 2fa735ef8f..40bb097f1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java @@ -52,7 +52,7 @@ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) { public void scheduleTask(DAGEventSchedulerUpdate event) { VertexInfo vInfo = null; if (vertexInfo != null) { - vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); + vInfo = vertexInfo.get(event.getVertexID()); } scheduleTaskWithLimit(event, vInfo); } @@ -71,7 +71,7 @@ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vIn public void taskCompleted(DAGEventSchedulerUpdate event) { taskCompletedEx(event); if (vertexInfo != null) { - VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); + VertexInfo vInfo = vertexInfo.get(event.getVertexID()); if (vInfo != null) { if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) { vInfo.concurrency--; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 3c4a05eb84..a824b68d76 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -457,7 +457,7 @@ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId) Iterator eventIterator = events.iterator(); while (eventIterator.hasNext()) { TezEvent tezEvent = eventIterator.next(); - int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId(); + int taskIndex = tezEvent.getSourceInfo().getTaskID().getId(); int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId(); if (taskIndex == taskId.getId()) { // Process only if there's a pending event for the specific succeeded task @@ -476,7 +476,7 @@ public void handleInputInitializerEvents(Collection tezEvents) { List toForwardEvents = new LinkedList(); for (TezEvent tezEvent : tezEvents) { String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName(); - int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId(); + int taskIndex = tezEvent.getSourceInfo().getTaskID().getId(); int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId(); Map vertexSuccessfulAttemptMap = diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java index eda02b52da..fef36f8ae5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java @@ -19,9 +19,10 @@ package org.apache.tez.dag.app.dag.event; import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.records.TezVertexID; public class DAGEventSchedulerUpdate extends DAGEvent { - + public enum UpdateType { TA_SCHEDULE, TA_COMPLETED @@ -44,4 +45,8 @@ public UpdateType getUpdateType() { public TaskAttempt getAttempt() { return attempt; } + + public TezVertexID getVertexID() { + return attempt.getVertexID(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java index d5745c4df4..7ab6141c39 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java @@ -34,7 +34,7 @@ public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttempt public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state, long timestamp, boolean justStarted) { - super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getTaskID().getVertexID()); + super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getVertexID()); this.id = taId; this.state = state; this.timestamp = timestamp; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java index 63ef70feb4..5146a2ebe6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java @@ -19,7 +19,10 @@ package org.apache.tez.dag.app.dag.event; import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; /** * This class encapsulates task attempt related events. @@ -42,6 +45,18 @@ public TaskAttemptEvent(TezTaskAttemptID id, TaskAttemptEventType type) { public TezTaskAttemptID getTaskAttemptID() { return attemptID; } + + public TezDAGID getDAGId() { + return attemptID.getDAGId(); + } + + public TezVertexID getVertexID() { + return attemptID.getVertexID(); + } + + public TezTaskID getTaskID() { + return attemptID.getTaskID(); + } @Override public int getSerializingHash() { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java index def9ddfa7c..7a968e2f08 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java @@ -19,7 +19,9 @@ package org.apache.tez.dag.app.dag.event; import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; /** * this class encapsulates task related events. @@ -37,7 +39,16 @@ public TaskEvent(TezTaskID taskId, TaskEventType type) { public TezTaskID getTaskID() { return taskId; } - + + + public TezDAGID getDAGId() { + return taskId.getDAGId(); + } + + public TezVertexID getVertexID() { + return taskId.getVertexID(); + } + @Override public int getSerializingHash() { return taskId.getSerializingHash(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java index 33128e4536..983de0dc03 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.event; import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; /** @@ -37,4 +38,8 @@ public VertexEvent(TezVertexID vertexId, VertexEventType type) { public TezVertexID getVertexId() { return vertexId; } + + public TezDAGID getDAGId() { + return vertexId.getDAGId(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java index 5b07674422..e23d27cf6e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java @@ -28,7 +28,7 @@ public class VertexEventTaskAttemptCompleted extends VertexEvent { public VertexEventTaskAttemptCompleted(TezTaskAttemptID taskAttemptId, TaskAttemptStateInternal state) { - super(taskAttemptId.getTaskID().getVertexID(), + super(taskAttemptId.getVertexID(), VertexEventType.V_TASK_ATTEMPT_COMPLETED); this.attemptId = taskAttemptId; this.attempState = state; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 07715cdfe7..8e730902d9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1040,7 +1040,7 @@ public VertexStatusBuilder getVertexStatus(String vertexName, } public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) { - return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID()) + return (TaskAttemptImpl) getVertex(taId.getVertexID()).getTask(taId.getTaskID()) .getAttempt(taId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e6b9e82000..b02bf43951 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -599,7 +599,7 @@ public TezTaskID getTaskID() { @Override public TezVertexID getVertexID() { - return attemptId.getTaskID().getVertexID(); + return attemptId.getVertexID(); } @Override @@ -884,12 +884,12 @@ public void handle(TaskAttemptEvent event) { LOG.error("Can't handle this event at current state for " + this.attemptId, e); eventHandler.handle(new DAGEventDiagnosticsUpdate( - this.attemptId.getTaskID().getVertexID().getDAGId(), + getDAGID(), "Invalid event " + event.getType() + " on TaskAttempt " + this.attemptId)); eventHandler.handle( new DAGEvent( - this.attemptId.getTaskID().getVertexID().getDAGId(), + getDAGID(), DAGEventType.INTERNAL_ERROR) ); } catch (RuntimeException e) { @@ -897,13 +897,13 @@ public void handle(TaskAttemptEvent event) { + " at current state " + oldState + " for " + this.attemptId, e); eventHandler.handle(new DAGEventDiagnosticsUpdate( - this.attemptId.getTaskID().getVertexID().getDAGId(), + getDAGID(), "Uncaught exception when handling event " + event.getType() + " on TaskAttempt " + this.attemptId + " at state " + oldState + ", error=" + e.getMessage())); eventHandler.handle( new DAGEvent( - this.attemptId.getTaskID().getVertexID().getDAGId(), + getDAGID(), DAGEventType.INTERNAL_ERROR) ); } @@ -1856,7 +1856,7 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt, boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec; int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex( - failedDestTaId.getTaskID().getVertexID()).getRunningTasks(); + failedDestTaId.getVertexID()).getRunningTasks(); float failureFraction = runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0; boolean withinFailureFractionLimits = diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index cb8545f8ca..64b3db4c01 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -41,6 +41,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,10 +98,6 @@ import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -865,17 +866,17 @@ protected void internalError(TaskEventType type) { LOG.error("Invalid event " + type + " on Task " + this.taskId + " in state:" + getInternalState()); eventHandler.handle(new DAGEventDiagnosticsUpdate( - this.taskId.getVertexID().getDAGId(), "Invalid event " + type + + getDAGId(), "Invalid event " + type + " on Task " + this.taskId)); - eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(), + eventHandler.handle(new DAGEvent(getDAGId(), DAGEventType.INTERNAL_ERROR)); } protected void internalErrorUncaughtException(TaskEventType type, Exception e) { eventHandler.handle(new DAGEventDiagnosticsUpdate( - this.taskId.getVertexID().getDAGId(), "Uncaught exception when handling event " + type + + getDAGId(), "Uncaught exception when handling event " + type + " on Task " + this.taskId + ", error=" + e.getMessage())); - eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(), + eventHandler.handle(new DAGEvent(getDAGId(), DAGEventType.INTERNAL_ERROR)); } @@ -918,7 +919,7 @@ protected void logJobHistoryTaskStartedEvent() { TaskStartedEvent startEvt = new TaskStartedEvent(taskId, getVertex().getName(), scheduledTime, getLaunchTime()); this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(taskId.getVertexID().getDAGId(), startEvt)); + new DAGHistoryEvent(getDAGId(), startEvt)); } protected void logJobHistoryTaskFinishedEvent() { @@ -930,7 +931,7 @@ protected void logJobHistoryTaskFinishedEvent() { successfulAttempt, TaskState.SUCCEEDED, "", getCounters(), failedAttempts); this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt)); + new DAGHistoryEvent(getDAGId(), finishEvt)); } protected void logJobHistoryTaskFailedEvent(TaskState finalState) { @@ -941,7 +942,11 @@ protected void logJobHistoryTaskFailedEvent(TaskState finalState) { StringUtils.join(getDiagnostics(), LINE_SEPARATOR), getCounters(), failedAttempts); this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt)); + new DAGHistoryEvent(getDAGId(), finishEvt)); + } + + private TezDAGID getDAGId() { + return taskId.getDAGId(); } private void addDiagnosticInfo(String diag) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f22f6de250..71bcfb537c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2919,7 +2919,7 @@ private static List getTaskAttemptIdentifiers(DAG dag, List attempts = new ArrayList(taIds.size()); String dagName = dag.getName(); for (TezTaskAttemptID taId : taIds) { - String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName(); + String vertexName = dag.getVertex(taId.getVertexID()).getName(); attempts.add(getTaskAttemptIdentifier(dagName, vertexName, taId)); } return attempts; @@ -3639,7 +3639,7 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) { TezTaskAttemptID taId = completionEvent.getTaskAttemptId(); vertex.vertexManager.onSourceTaskCompleted( getTaskAttemptIdentifier(vertex.dag.getName(), - vertex.dag.getVertex(taId.getTaskID().getVertexID()).getName(), + vertex.dag.getVertex(taId.getVertexID()).getName(), taId)); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); @@ -4090,7 +4090,7 @@ private void handleRoutedTezEvents(List tezEvents, boolean isPendingEv pendingTaskEvents.add(tezEvent); } else { // event not from this vertex. must have come from source vertex. - int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId(); + int srcTaskIndex = sourceMeta.getTaskID().getId(); Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName()); Edge srcEdge = sourceVertices.get(edgeVertex); if (srcEdge == null) { @@ -4132,7 +4132,7 @@ private void handleRoutedTezEvents(List tezEvents, boolean isPendingEv Preconditions.checkArgument(target != null, "Event sent to unkown vertex: " + vmEvent.getTargetVertexName()); TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); - if (srcTaId.getTaskID().getVertexID().equals(vertexId)) { + if (srcTaId.getVertexID().equals(vertexId)) { // this is the producer tasks' vertex vmEvent.setProducerAttemptIdentifier( getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId)); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java index dfa6bbdf91..601d8975f0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; public class DAGHistoryEvent { @@ -43,4 +44,7 @@ public TezDAGID getDagID() { return this.dagID; } + public ApplicationId getApplicationId() { + return getDagID().getApplicationId(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java index 0a7ef561f7..5d43818d7b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java @@ -25,6 +25,7 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -183,6 +184,10 @@ public TezDAGID getDagID() { return dagID; } + public ApplicationId getApplicationId() { + return dagID.getApplicationId(); + } + public long getStartTime() { return startTime; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java index 9e6c8b282b..f8a91d7dfc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java @@ -23,6 +23,7 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TezDAGID; @@ -105,6 +106,10 @@ public TezDAGID getDagID() { return dagID; } + public ApplicationId getApplicationId() { + return dagID.getApplicationId(); + } + public String getUser() { return user; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java index f1fdcac809..ba7f467f3c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java @@ -22,6 +22,7 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -101,6 +102,10 @@ public TezDAGID getDagID() { return dagID; } + public ApplicationId getApplicationId() { + return dagID.getApplicationId(); + } + public String getUser() { return user; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index e04ee80188..22a68f2791 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -24,6 +24,7 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -178,6 +179,10 @@ public TezDAGID getDagID() { return dagID; } + public ApplicationId getApplicationId() { + return dagID.getApplicationId(); + } + public ApplicationAttemptId getApplicationAttemptId() { return applicationAttemptId; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 18596495be..cf529ddccb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -24,9 +24,15 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +49,6 @@ import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.utils.TezEventUtils; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto; @@ -311,6 +315,22 @@ public TezTaskAttemptID getTaskAttemptID() { return taskAttemptId; } + public ApplicationId getApplicationId() { + return taskAttemptId.getApplicationId(); + } + + public TezDAGID getDAGId() { + return taskAttemptId.getDAGId(); + } + + public TezVertexID getVertexID() { + return taskAttemptId.getVertexID(); + } + + public TezTaskID getTaskID() { + return taskAttemptId.getTaskID(); + } + public TezCounters getCounters() { return tezCounters; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index a49e47c97f..fe367d7669 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -22,12 +22,16 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptStartedProto; public class TaskAttemptStartedEvent implements HistoryEvent { @@ -114,7 +118,23 @@ public String toString() { } public TezTaskAttemptID getTaskAttemptID() { - return this.taskAttemptId; + return taskAttemptId; + } + + public ApplicationId getApplicationId() { + return taskAttemptId.getApplicationId(); + } + + public TezDAGID getDAGId() { + return taskAttemptId.getDAGId(); + } + + public TezVertexID getVertexID() { + return taskAttemptId.getVertexID(); + } + + public TezTaskID getTaskID() { + return taskAttemptId.getTaskID(); } public long getStartTime() { @@ -140,5 +160,4 @@ public String getCompletedLogsUrl() { public String getNodeHttpAddress() { return nodeHttpAddress; } - } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java index dd7afdfc7c..7477093ebe 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java @@ -22,8 +22,11 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.counters.TezCounters; @@ -164,6 +167,18 @@ public TezTaskID getTaskID() { return taskID; } + public ApplicationId getApplicationId() { + return taskID.getApplicationId(); + } + + public TezDAGID getDAGId() { + return taskID.getDAGId(); + } + + public TezVertexID getVertexID() { + return taskID.getVertexID(); + } + public TaskState getState() { return state; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java index cc629699a4..72f93621dd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java @@ -22,10 +22,13 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskStartedProto; public class TaskStartedEvent implements HistoryEvent { @@ -108,6 +111,18 @@ public TezTaskID getTaskID() { return taskID; } + public ApplicationId getApplicationId() { + return taskID.getApplicationId(); + } + + public TezDAGID getDAGId() { + return taskID.getDAGId(); + } + + public TezVertexID getVertexID() { + return taskID.getVertexID(); + } + public long getScheduledTime() { return scheduledTime; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java index a2e20397cf..250a5cd459 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java @@ -24,11 +24,13 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto; @@ -186,6 +188,14 @@ public TezVertexID getVertexID() { return this.vertexID; } + public TezDAGID getDAGId() { + return vertexID.getDAGId(); + } + + public ApplicationId getApplicationId() { + return vertexID.getApplicationId(); + } + public int getNumTasks() { return numTasks; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index 58cb628b28..112753873f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -24,7 +24,9 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; +import org.apache.tez.dag.records.TezDAGID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.counters.TezCounters; @@ -161,6 +163,14 @@ public TezVertexID getVertexID() { return this.vertexID; } + public ApplicationId getApplicationId() { + return vertexID.getApplicationId(); + } + + public TezDAGID getDAGId() { + return vertexID.getDAGId(); + } + public VertexState getState() { return this.state; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java index e7452e6770..0f53cc7f7c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java @@ -25,6 +25,7 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; @@ -34,6 +35,7 @@ import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.utils.TezEventUtils; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto; @@ -181,7 +183,15 @@ public String toString() { } public TezVertexID getVertexID() { - return this.vertexID; + return vertexID; + } + + public TezDAGID getDAGId() { + return vertexID.getDAGId(); + } + + public ApplicationId getApplicationId() { + return vertexID.getApplicationId(); } public long getInitRequestedTime() { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java index 4a3e05f1f4..ebc87f9153 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java @@ -22,9 +22,11 @@ import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexStartedProto; @@ -95,7 +97,15 @@ public String toString() { } public TezVertexID getVertexID() { - return this.vertexID; + return vertexID; + } + + public TezDAGID getDAGId() { + return vertexID.getDAGId(); + } + + public ApplicationId getApplicationId() { + return vertexID.getApplicationId(); } public long getStartRequestedTime() { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index e60575f38e..90930386a2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -602,7 +602,7 @@ private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID); JSONObject taskEntity = new JSONObject(); - taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString()); + taskEntity.put(ATSConstants.ENTITY, event.getTaskID().toString()); taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name()); relatedEntities.put(nodeEntity); @@ -667,7 +667,7 @@ private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws // Related entities JSONArray relatedEntities = new JSONArray(); JSONObject vertexEntity = new JSONObject(); - vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString()); + vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().toString()); vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name()); relatedEntities.put(vertexEntity); jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities); @@ -775,7 +775,7 @@ private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent e // Related entities JSONArray relatedEntities = new JSONArray(); JSONObject vertexEntity = new JSONObject(); - vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString()); + vertexEntity.put(ATSConstants.ENTITY, event.getDAGId().toString()); vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name()); relatedEntities.put(vertexEntity); jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities); @@ -815,7 +815,7 @@ private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) // Related entities JSONArray relatedEntities = new JSONArray(); JSONObject vertexEntity = new JSONObject(); - vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString()); + vertexEntity.put(ATSConstants.ENTITY, event.getDAGId().toString()); vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name()); relatedEntities.put(vertexEntity); jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java index b6f50301c7..9c3dfeb999 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -380,7 +380,7 @@ public void testBasicSpeculation(boolean withProgress) throws Exception { .getValue()); Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) .getValue()); - org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID()); + org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getVertexID()); Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) .getValue()); } @@ -517,7 +517,7 @@ public void testBasicSpeculationNotUseful() throws Exception { .getValue()); Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) .getValue()); - org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID()); + org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getVertexID()); Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) .getValue()); tezClient.stop(); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 7611f1c44e..87024c2ae3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -176,7 +176,7 @@ private class TaskEventDispatcher implements EventHandler { @SuppressWarnings("unchecked") @Override public void handle(TaskEvent event) { - Vertex vertex = dag.getVertex(event.getTaskID().getVertexID()); + Vertex vertex = dag.getVertex(event.getVertexID()); Task task = vertex.getTask(event.getTaskID()); ((EventHandler) task).handle(event); } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 2f2b3b8b8f..c504626bcc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -230,9 +230,9 @@ private class TaskEventDispatcher implements EventHandler { @SuppressWarnings("unchecked") @Override public void handle(TaskEvent event) { - TezDAGID id = event.getTaskID().getVertexID().getDAGId(); + TezDAGID id = event.getDAGId(); DAGImpl handler = chooseDAG(id); - Vertex vertex = handler.getVertex(event.getTaskID().getVertexID()); + Vertex vertex = handler.getVertex(event.getVertexID()); Task task = vertex.getTask(event.getTaskID()); ((EventHandler)task).handle(event); } @@ -249,10 +249,10 @@ public void handle(TaskAttemptEvent event) { private class TaskAttemptEventDisptacher2 implements EventHandler { @Override public void handle(TaskAttemptEvent event) { - TezDAGID id = event.getTaskAttemptID().getTaskID().getVertexID().getDAGId(); + TezDAGID id = event.getDAGId(); DAGImpl handler = chooseDAG(id); - Vertex vertex = handler.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()); - Task task = vertex.getTask(event.getTaskAttemptID().getTaskID()); + Vertex vertex = handler.getVertex(event.getVertexID()); + Task task = vertex.getTask(event.getTaskID()); TaskAttempt ta = task.getAttempt(event.getTaskAttemptID()); ((EventHandler)ta).handle(event); } @@ -264,7 +264,7 @@ private class VertexEventDispatcher @SuppressWarnings("unchecked") @Override public void handle(VertexEvent event) { - TezDAGID id = event.getVertexId().getDAGId(); + TezDAGID id = event.getDAGId(); DAGImpl handler = chooseDAG(id); Vertex vertex = handler.getVertex(event.getVertexId()); ((EventHandler) vertex).handle(event); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 95ea8a030e..066a1512ed 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -209,7 +209,7 @@ private class TaskEventDispatcher implements EventHandler { @SuppressWarnings("unchecked") @Override public void handle(TaskEvent event) { - TaskImpl task = (TaskImpl) dag.getVertex(event.getTaskID().getVertexID()) + TaskImpl task = (TaskImpl) dag.getVertex(event.getVertexID()) .getTask(event.getTaskID()); task.handle(event); } @@ -220,8 +220,7 @@ private class TaskAttemptEventDispatcher implements EventHandler { @Override public void handle(TaskAttemptEvent event) { - Vertex vertex = dag.getVertex(event.getTaskAttemptID().getTaskID() - .getVertexID()); + Vertex vertex = dag.getVertex(event.getVertexID()); Task task = vertex.getTask(event.getTaskAttemptID().getTaskID()); TaskAttempt ta = task.getAttempt(event.getTaskAttemptID()); ((EventHandler) ta).handle(event); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java index 07c361aded..47b999da8e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -39,7 +39,7 @@ public class TestDAGScheduler { - class MockEventHandler implements EventHandler { + static class MockEventHandler implements EventHandler { TaskAttemptEventSchedule event; List events = Lists.newLinkedList(); @Override @@ -114,17 +114,17 @@ public void testConcurrencyLimit() { // schedule beyond limit and it gets scheduled mockAttempt = mock(TaskAttempt.class); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(1, mockEventHandler.events.size()); mockAttempt = mock(TaskAttempt.class); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(2, mockEventHandler.events.size()); mockAttempt = mock(TaskAttempt.class); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(3, mockEventHandler.events.size()); @@ -134,12 +134,15 @@ public void testConcurrencyLimit() { int completed = 0; int requested = 0; int scheduled = 0; + TezTaskAttemptID taskAttemptId; scheduler.addVertexConcurrencyLimit(vId1, 2); // effective // schedule beyond limit and it gets buffered mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + taskAttemptId = TezTaskAttemptID.getInstance(tId1, requested++); + when(mockAttempt.getID()).thenReturn(taskAttemptId); + when(mockAttempt.getVertexID()).thenReturn(taskAttemptId.getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled @@ -149,7 +152,9 @@ public void testConcurrencyLimit() { mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + taskAttemptId = TezTaskAttemptID.getInstance(tId1, requested++); + when(mockAttempt.getID()).thenReturn(taskAttemptId); + when(mockAttempt.getVertexID()).thenReturn(taskAttemptId.getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled @@ -159,14 +164,18 @@ public void testConcurrencyLimit() { mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + taskAttemptId = TezTaskAttemptID.getInstance(tId1, requested++); + when(mockAttempt.getID()).thenReturn(taskAttemptId); + when(mockAttempt.getVertexID()).thenReturn(taskAttemptId.getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + taskAttemptId = TezTaskAttemptID.getInstance(tId1, requested++); + when(mockAttempt.getID()).thenReturn(taskAttemptId); + when(mockAttempt.getVertexID()).thenReturn(taskAttemptId.getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered @@ -186,19 +195,17 @@ public void testConcurrencyLimit() { scheduled++; scheduler.taskCompleted(new DAGEventSchedulerUpdate( - DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++))); + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed))); Assert.assertEquals(scheduled, mockEventHandler.events.size()); // no extra scheduling mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled Assert.assertEquals(mockAttempts.get(scheduled).getID(), mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order - scheduled++; - } @Test(timeout=5000) @@ -220,7 +227,6 @@ public void testConcurrencyLimitWithKilledNonRunningTask() { mockEventHandler); List mockAttempts = Lists.newArrayList(); - int completed = 0; int requested = 0; int scheduled = 0; scheduler.addVertexConcurrencyLimit(vId0, 1); // effective @@ -228,7 +234,7 @@ public void testConcurrencyLimitWithKilledNonRunningTask() { // schedule beyond limit and it gets buffered mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled @@ -238,14 +244,14 @@ public void testConcurrencyLimitWithKilledNonRunningTask() { mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered mockAttempt = mock(TaskAttempt.class); mockAttempts.add(mockAttempt); - when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + when(mockAttempt.getVertexID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++).getVertexID()); scheduler.scheduleTask(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java index 63137c716a..6f0f47fad3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java @@ -164,9 +164,9 @@ public void testSourceRequestDelayed() { for (Event raw : args.getAllValues()) { TaskAttemptEventSchedule event = (TaskAttemptEventSchedule) raw; if (count < vertices[2].getTotalTasks() - 3) { - assertEquals(2, event.getTaskAttemptID().getTaskID().getVertexID().getId()); + assertEquals(2, event.getVertexID().getId()); } else { - assertEquals(4, event.getTaskAttemptID().getTaskID().getVertexID().getId()); + assertEquals(4, event.getVertexID().getId()); } count++; } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 7a2a05fb26..4071dc7457 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -1876,6 +1876,7 @@ taListener, taskConf, new SystemClock(), TezTaskID destTaskID = mock(TezTaskID.class); TezVertexID destVertexID = mock(TezVertexID.class); when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(mockDestId1.getVertexID()).thenReturn(destVertexID); when(destTaskID.getVertexID()).thenReturn(destVertexID); Vertex destVertex = mock(VertexImpl.class); when(destVertex.getRunningTasks()).thenReturn(11); @@ -1901,14 +1902,14 @@ taListener, taskConf, new SystemClock(), destTaskID = mock(TezTaskID.class); destVertexID = mock(TezVertexID.class); when(mockDestId2.getTaskID()).thenReturn(destTaskID); + when(mockDestId2.getVertexID()).thenReturn(destVertexID); when(destTaskID.getVertexID()).thenReturn(destVertexID); destVertex = mock(VertexImpl.class); when(destVertex.getRunningTasks()).thenReturn(11); when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); - assertEquals("Task attempt is not in FAILED state", taImpl.getState(), - TaskAttemptState.FAILED); + assertEquals("Task attempt is not in FAILED state", TaskAttemptState.FAILED, taImpl.getState()); assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause()); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); @@ -1964,6 +1965,7 @@ taListener, taskConf, new SystemClock(), mockMeta = mock(EventMetaData.class); mockDestId1 = mock(TezTaskAttemptID.class); when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(mockDestId1.getVertexID()).thenReturn(destVertexID); when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); tzEvent = new TezEvent(mockReEvent, mockMeta); //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as @@ -2006,6 +2008,7 @@ taListener, taskConf, new SystemClock(), mockMeta = mock(EventMetaData.class); mockDestId1 = mock(TezTaskAttemptID.class); when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(mockDestId1.getVertexID()).thenReturn(destVertexID); when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); tzEvent = new TezEvent(mockReEvent, mockMeta); when(mockClock.getTime()).thenReturn(1000L); @@ -2102,6 +2105,7 @@ taListener, taskConf, new SystemClock(), TezTaskID destTaskID = mock(TezTaskID.class); TezVertexID destVertexID = mock(TezVertexID.class); when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(mockDestId1.getVertexID()).thenReturn(destVertexID); when(destTaskID.getVertexID()).thenReturn(destVertexID); Vertex destVertex = mock(VertexImpl.class); when(destVertex.getRunningTasks()).thenReturn(5); @@ -2111,8 +2115,7 @@ taListener, taskConf, new SystemClock(), taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); // failure threshold is met due to running tasks. state is FAILED - assertEquals("Task attempt is not in FAILED state", taImpl.getState(), - TaskAttemptState.FAILED); + assertEquals("Task attempt is not in FAILED state", TaskAttemptState.FAILED, taImpl.getState()); } @SuppressWarnings("deprecation") @@ -2190,7 +2193,7 @@ private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch, InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource, null); TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class); when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class)); - when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class)); + when(destTaskAttemptId.getVertexID()).thenReturn(mock(TezVertexID.class)); when(appCtx.getCurrentDAG()).thenReturn(mock(DAG.class)); when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class))) .thenReturn(mock(Vertex.class)); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index fb7872ff9a..de95af8d65 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -386,7 +386,7 @@ private class TaskAttemptEventDispatcher implements EventHandler)task.getAttempt( event.getTaskAttemptID())).handle(event); @@ -399,7 +399,7 @@ private class TaskEventDispatcher implements EventHandler { @Override public void handle(TaskEvent event) { events.add(event); - VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID()); + VertexImpl vertex = vertexIdMap.get(event.getVertexID()); Task task = vertex.getTask(event.getTaskID()); if (task != null) { ((EventHandler)task).handle(event); diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index eef73a0d0a..7864e1c852 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -556,12 +556,12 @@ private void checkAndThrowExceptionForTests(SubmitWorkRequestProto request) thro } TaskSpec taskSpec = ProtoConverters.getTaskSpecfromProto(request.getTaskSpec()); - if (taskSpec.getTaskAttemptID().getTaskID().getId() == 0 && + if (taskSpec.getTaskID().getId() == 0 && taskSpec.getTaskAttemptID().getId() == 0) { LOG.info("Simulating Rejected work"); throw new RejectedExecutionException( "Simulating Rejected work for taskAttemptId=" + taskSpec.getTaskAttemptID()); - } else if (taskSpec.getTaskAttemptID().getTaskID().getId() == 1 && + } else if (taskSpec.getTaskID().getId() == 1 && taskSpec.getTaskAttemptID().getId() == 0) { LOG.info("Simulating Task Setup Failure during launch"); throw new TezException("Simulating Task Setup Failure during launch for taskAttemptId=" + diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java index 0f1b56d3d2..c4d273ec23 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java @@ -39,7 +39,7 @@ public static JobID toMRJobId(TezDAGID dagId) { public static TaskID toMRTaskId(TezTaskID taskid) { return new TaskID( - toMRJobId(taskid.getVertexID().getDAGId()), + toMRJobId(taskid.getDAGId()), taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE, taskid.getId()); } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java index e5e7022064..941c8732bd 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java @@ -53,8 +53,8 @@ public static org.apache.hadoop.mapred.TaskAttemptID createMockTaskAttemptID( public static org.apache.hadoop.mapred.TaskAttemptID createMockTaskAttemptIDFromTezTaskAttemptId(TezTaskAttemptID tezTaId, boolean isMap) { - TezVertexID vId = tezTaId.getTaskID().getVertexID(); - ApplicationId appId = vId.getDAGId().getApplicationId(); + TezVertexID vId = tezTaId.getVertexID(); + ApplicationId appId = vId.getApplicationId(); return new org.apache.hadoop.mapred.TaskAttemptID( new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp()) + String.valueOf(vId.getId()), appId.getId(), @@ -65,7 +65,7 @@ public static org.apache.hadoop.mapred.TaskAttemptID createMockTaskAttemptID( public static org.apache.hadoop.mapred.TaskID createMockTaskAttemptIDFromTezTaskId(TezTaskID tezTaId, boolean isMap) { TezVertexID vId = tezTaId.getVertexID(); - ApplicationId appId = vId.getDAGId().getApplicationId(); + ApplicationId appId = vId.getApplicationId(); return new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp()) + String.valueOf(vId.getId()), appId.getId(), isMap ? TaskType.MAP : TaskType.REDUCE, tezTaId.getId()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index 1a13168cb7..dbda689b64 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -321,8 +321,7 @@ public void initTask(LogicalOutput output) throws IOException, } this.mrReporter = new MRTaskReporter(processorContext); this.useNewApi = jobConf.getUseNewMapper(); - TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID() - .getVertexID().getDAGId(); + TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getDAGId(); this.jobContext = new JobContextImpl(jobConf, dagId, mrReporter); this.taskAttemptContext = diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java index c1711ce2cb..453c5fc70c 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java @@ -278,9 +278,9 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, case Constants.TEZ_TASK_ID: String taskName = entity; TezTaskID tezTaskID = TezTaskID.fromString(taskName); - if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) { + if (!tezDAGID.equals(tezTaskID.getDAGId())) { LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskName, tezDAGID, tezDAGID, - tezTaskID.getVertexID().getDAGId()); + tezTaskID.getDAGId()); continue; } if (!taskJsonMap.containsKey(taskName)) { @@ -293,9 +293,9 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, case Constants.TEZ_TASK_ATTEMPT_ID: String taskAttemptName = entity; TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName); - if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) { + if (!tezDAGID.equals(tezAttemptId.getDAGId())) { LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskAttemptName, tezDAGID, tezDAGID, - tezAttemptId.getTaskID().getVertexID().getDAGId()); + tezAttemptId.getDAGId()); continue; } if (!attemptJsonMap.containsKey(taskAttemptName)) { diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java index af8e292b24..9f3881c8b2 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java @@ -90,9 +90,8 @@ protected void linkParsedContents() { //Link task to task attempt TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(attemptInfo .getTaskAttemptId()); - VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getTaskID() - .getVertexID().toString()); - Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId.getTaskID() + VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getVertexID().toString()); + Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId .getVertexID().toString() + " is not present in DAG"); TaskInfo taskInfo = vertexInfo.getTask(taskAttemptId.getTaskID().toString()); attemptInfo.setTaskInfo(taskInfo); diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java index d211feb0e3..b6e7744ee2 100644 --- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java +++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java @@ -100,12 +100,12 @@ private Set convertToTimelineEntityGroupIds(String entity } else if (entityType.equals(EntityTypes.TEZ_TASK_ID.name())) { TezTaskID taskID = TezTaskID.fromString(entityId); if (taskID != null) { - return createTimelineEntityGroupIds(taskID.getVertexID().getDAGId()); + return createTimelineEntityGroupIds(taskID.getDAGId()); } } else if (entityType.equals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())) { TezTaskAttemptID taskAttemptID = TezTaskAttemptID.fromString(entityId); if (taskAttemptID != null) { - return createTimelineEntityGroupIds(taskAttemptID.getTaskID().getVertexID().getDAGId()); + return createTimelineEntityGroupIds(taskAttemptID.getDAGId()); } } else if (entityType.equals(EntityTypes.TEZ_CONTAINER_ID.name())) { String cId = entityId; diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java index a71f0d8db0..8a1293152f 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java @@ -313,7 +313,7 @@ public TimelineEntityGroupId getGroupId(DAGHistoryEvent event) { String entityGroupId = numDagsPerGroup > 1 ? event.getDagID().getGroupId(numDagsPerGroup) : event.getDagID().toString(); - return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId); + return TimelineEntityGroupId.newInstance(event.getApplicationId(), entityGroupId); case APP_LAUNCHED: case AM_LAUNCHED: case AM_STARTED: diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index c40d3a8f78..1b06fa2c69 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -298,7 +298,7 @@ private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) { atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); if (event.getDAGPlan().hasCallerContext() @@ -355,7 +355,7 @@ private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent eve atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime()); @@ -383,7 +383,7 @@ private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) { atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); @@ -412,7 +412,7 @@ private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); + event.getApplicationId().toString()); if (event.getDAGPlan().hasCallerContext() && event.getDAGPlan().getCallerContext().hasCallerId()) { @@ -477,13 +477,13 @@ private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishe atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + event.getDAGId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().toString()); + event.getVertexID().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); + event.getTaskID().toString()); TimelineEvent finishEvt = new TimelineEvent(); finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name()); @@ -542,16 +542,16 @@ private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedE atsEntity.setStartTime(event.getStartTime()); atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); + event.getTaskID().toString()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + event.getDAGId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().toString()); + event.getVertexID().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); + event.getTaskID().toString()); TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name()); @@ -579,11 +579,11 @@ private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskID().getVertexID().getDAGId().toString()); + event.getDAGId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); + event.getVertexID().toString()); TimelineEvent finishEvt = new TimelineEvent(); finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name()); @@ -614,14 +614,14 @@ private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) { atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); + event.getVertexID().toString()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskID().getVertexID().getDAGId().toString()); + event.getDAGId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); + event.getVertexID().toString()); TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(HistoryEventType.TASK_STARTED.name()); @@ -643,9 +643,9 @@ private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent eve atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); + event.getDAGId().toString()); TimelineEvent finishEvt = new TimelineEvent(); finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name()); @@ -685,12 +685,12 @@ private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEve atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); + event.getDAGId().toString()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); + event.getDAGId().toString()); TimelineEvent initEvt = new TimelineEvent(); initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name()); @@ -718,9 +718,9 @@ private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); + event.getDAGId().toString()); TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name()); @@ -741,9 +741,9 @@ private static TimelineEntity convertVertexReconfigureDoneEvent( atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); + event.getApplicationId().toString()); atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); + event.getDAGId().toString()); TimelineEvent updateEvt = new TimelineEvent(); updateEvt.setEventType(HistoryEventType.VERTEX_CONFIGURE_DONE.name()); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java index 0542b33d03..1848d98e9a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.util.StringInterner; /** @@ -87,6 +88,10 @@ public TezTaskAttemptID getTaskAttemptID() { return taskAttemptID; } + public TezTaskID getTaskID() { + return taskAttemptID.getTaskID(); + } + public String getTaskVertexName() { return taskVertexName; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java index 63c251c054..35b252e27b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.util.StringInterner; import com.google.common.collect.Lists; @@ -128,7 +129,7 @@ public String getDAGName() { } public int getDagIdentifier() { - return taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + return taskAttemptId.getDAGId().getId(); } public int getVertexParallelism() { @@ -143,6 +144,10 @@ public TezTaskAttemptID getTaskAttemptID() { return taskAttemptId; } + public TezTaskID getTaskID() { + return taskAttemptId.getTaskID(); + } + public ProcessorDescriptor getProcessorDescriptor() { return processorDescriptor; } @@ -266,5 +271,4 @@ public String toString() { } return sb.toString(); } - } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index a47dac1e0a..b119bfbc80 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -118,8 +118,7 @@ public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptN @Override public ApplicationId getApplicationId() { - return taskAttemptID.getTaskID().getVertexID().getDAGId() - .getApplicationId(); + return taskAttemptID.getApplicationId(); } @Override @@ -154,12 +153,12 @@ public String getTaskVertexName() { @Override public int getTaskVertexIndex() { - return taskAttemptID.getTaskID().getVertexID().getId(); + return taskAttemptID.getVertexID().getId(); } @Override public int getDagIdentifier() { - return taskAttemptID.getTaskID().getVertexID().getDAGId().getId(); + return taskAttemptID.getDAGId().getId(); } @Override diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index c82355a9fa..8379bf701f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -372,8 +372,7 @@ public URI apply(TezLocalResource input) { private void cleanupOnTaskChanged(ContainerTask containerTask) { Preconditions.checkState(!containerTask.shouldDie()); Preconditions.checkState(containerTask.getTaskSpec() != null); - TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID() - .getVertexID(); + TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getVertexID(); if (lastVertexID != null) { if (!lastVertexID.equals(newVertexID)) { objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX); diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java index 3ed386e346..186ab7e659 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -334,7 +334,7 @@ public boolean match(HistoryEvent incomingEvent) { TaskStartedEvent otherEvent = (TaskStartedEvent) incomingEvent; TaskStartedEvent conditionEvent = (TaskStartedEvent) event; // compare vertexId and taskId - return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId() + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId(); } break; @@ -344,7 +344,7 @@ public boolean match(HistoryEvent incomingEvent) { TaskFinishedEvent otherEvent = (TaskFinishedEvent) incomingEvent; TaskFinishedEvent conditionEvent = (TaskFinishedEvent) event; // compare vertexId and taskId - return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId() + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId(); } break; @@ -354,9 +354,9 @@ public boolean match(HistoryEvent incomingEvent) { TaskAttemptStartedEvent otherEvent = (TaskAttemptStartedEvent) incomingEvent; TaskAttemptStartedEvent conditionEvent = (TaskAttemptStartedEvent) event; // compare vertexId, taskId & taskAttemptId - return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() - == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId() - && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId() + return otherEvent.getVertexID().getId() + == conditionEvent.getVertexID().getId() + && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId() && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId(); } break; @@ -366,9 +366,9 @@ public boolean match(HistoryEvent incomingEvent) { TaskAttemptFinishedEvent otherEvent = (TaskAttemptFinishedEvent) incomingEvent; TaskAttemptFinishedEvent conditionEvent = (TaskAttemptFinishedEvent) event; // compare vertexId, taskId & taskAttemptId - return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() - == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId() - && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId() + return otherEvent.getVertexID().getId() + == conditionEvent.getVertexID().getId() + && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId() && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId(); } break; diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 6d3ab1c711..7fe3b3acc1 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -463,9 +463,9 @@ private List findTaskAttemptFinishedEvent( if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent) historyEvent; - if (taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID() + if (taFinishedEvent.getVertexID() .getId() == vertexId - && taFinishedEvent.getTaskAttemptID().getTaskID().getId() == taskId) { + && taFinishedEvent.getTaskID().getId() == taskId) { resultEvents.add(taFinishedEvent); } }