Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import org.apache.hadoop.yarn.api.records.ApplicationId;

/**
* TezTaskAttemptID represents the immutable and unique identifier for
Expand Down Expand Up @@ -76,6 +77,18 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
public TezTaskID getTaskID() {
return taskId;
}

public TezVertexID getVertexID() {
return taskId.getVertexID();
}

public TezDAGID getDAGId() {
return taskId.getDAGId();
}

public ApplicationId getApplicationId() {
return taskId.getApplicationId();
}

@Override
public boolean equals(Object o) {
Expand Down Expand Up @@ -162,5 +175,4 @@ public static TezTaskAttemptID fromString(String taIdStr) {
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;

Expand Down Expand Up @@ -83,6 +84,14 @@ public TezVertexID getVertexID() {
return vertexId;
}

public TezDAGID getDAGId() {
return vertexId.getDAGId();
}

public ApplicationId getApplicationId() {
return vertexId.getApplicationId();
}

@Override
public boolean equals(Object o) {
if (!super.equals(o))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;

Expand Down Expand Up @@ -83,6 +84,11 @@ public TezDAGID getDAGId() {
return dagId;
}


public ApplicationId getApplicationId() {
return dagId.getApplicationId();
}

@Override
public boolean equals(Object o) {
if (!super.equals(o))
Expand Down Expand Up @@ -158,5 +164,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}

}
12 changes: 6 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2187,12 +2187,12 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskID().getVertexID()).
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
Expand All @@ -2217,13 +2217,13 @@ private class TaskAttemptEventDispatcher
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
Expand All @@ -2236,7 +2236,7 @@ private class VertexEventDispatcher
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getVertexId().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Expand Down
12 changes: 6 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -846,19 +846,19 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
case TASK_STARTED:
{
TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID());
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
taskRecoveryData.taskStartedEvent = taskStartedEvent;
break;
}
case TASK_FINISHED:
{
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID());
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
break;
Expand All @@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
taStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand All @@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
taFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {

DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
job.getVertex(taskAttemptId.getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
public void scheduleTask(DAGEventSchedulerUpdate event) {
VertexInfo vInfo = null;
if (vertexInfo != null) {
vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
vInfo = vertexInfo.get(event.getVertexID());
}
scheduleTaskWithLimit(event, vInfo);
}
Expand All @@ -71,7 +71,7 @@ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vIn
public void taskCompleted(DAGEventSchedulerUpdate event) {
taskCompletedEx(event);
if (vertexInfo != null) {
VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
VertexInfo vInfo = vertexInfo.get(event.getVertexID());
if (vInfo != null) {
if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
vInfo.concurrency--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId)
Iterator<TezEvent> eventIterator = events.iterator();
while (eventIterator.hasNext()) {
TezEvent tezEvent = eventIterator.next();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
if (taskIndex == taskId.getId()) {
// Process only if there's a pending event for the specific succeeded task
Expand All @@ -476,7 +476,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
for (TezEvent tezEvent : tezEvents) {
String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();

Map<Integer, Integer> vertexSuccessfulAttemptMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezVertexID;

public class DAGEventSchedulerUpdate extends DAGEvent {

public enum UpdateType {
TA_SCHEDULE,
TA_COMPLETED
Expand All @@ -44,4 +45,8 @@ public UpdateType getUpdateType() {
public TaskAttempt getAttempt() {
return attempt;
}

public TezVertexID getVertexID() {
return attempt.getVertexID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttempt

public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
long timestamp, boolean justStarted) {
super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getTaskID().getVertexID());
super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getVertexID());
this.id = taId;
this.state = state;
this.timestamp = timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;

/**
* This class encapsulates task attempt related events.
Expand All @@ -42,6 +45,18 @@ public TaskAttemptEvent(TezTaskAttemptID id, TaskAttemptEventType type) {
public TezTaskAttemptID getTaskAttemptID() {
return attemptID;
}

public TezDAGID getDAGId() {
return attemptID.getDAGId();
}

public TezVertexID getVertexID() {
return attemptID.getVertexID();
}

public TezTaskID getTaskID() {
return attemptID.getTaskID();
}

@Override
public int getSerializingHash() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;

/**
* this class encapsulates task related events.
Expand All @@ -37,7 +39,16 @@ public TaskEvent(TezTaskID taskId, TaskEventType type) {
public TezTaskID getTaskID() {
return taskId;
}



public TezDAGID getDAGId() {
return taskId.getDAGId();
}

public TezVertexID getVertexID() {
return taskId.getVertexID();
}

@Override
public int getSerializingHash() {
return taskId.getSerializingHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;

/**
Expand All @@ -37,4 +38,8 @@ public VertexEvent(TezVertexID vertexId, VertexEventType type) {
public TezVertexID getVertexId() {
return vertexId;
}

public TezDAGID getDAGId() {
return vertexId.getDAGId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class VertexEventTaskAttemptCompleted extends VertexEvent {

public VertexEventTaskAttemptCompleted(TezTaskAttemptID taskAttemptId,
TaskAttemptStateInternal state) {
super(taskAttemptId.getTaskID().getVertexID(),
super(taskAttemptId.getVertexID(),
VertexEventType.V_TASK_ATTEMPT_COMPLETED);
this.attemptId = taskAttemptId;
this.attempState = state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ public VertexStatusBuilder getVertexStatus(String vertexName,
}

public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) {
return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID())
return (TaskAttemptImpl) getVertex(taId.getVertexID()).getTask(taId.getTaskID())
.getAttempt(taId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public TezTaskID getTaskID() {

@Override
public TezVertexID getVertexID() {
return attemptId.getTaskID().getVertexID();
return attemptId.getVertexID();
}

@Override
Expand Down Expand Up @@ -884,26 +884,26 @@ public void handle(TaskAttemptEvent event) {
LOG.error("Can't handle this event at current state for "
+ this.attemptId, e);
eventHandler.handle(new DAGEventDiagnosticsUpdate(
this.attemptId.getTaskID().getVertexID().getDAGId(),
getDAGID(),
"Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));
eventHandler.handle(
new DAGEvent(
this.attemptId.getTaskID().getVertexID().getDAGId(),
getDAGID(),
DAGEventType.INTERNAL_ERROR)
);
} catch (RuntimeException e) {
LOG.error("Uncaught exception when handling event " + event.getType()
+ " at current state " + oldState + " for "
+ this.attemptId, e);
eventHandler.handle(new DAGEventDiagnosticsUpdate(
this.attemptId.getTaskID().getVertexID().getDAGId(),
getDAGID(),
"Uncaught exception when handling event " + event.getType()
+ " on TaskAttempt " + this.attemptId
+ " at state " + oldState + ", error=" + e.getMessage()));
eventHandler.handle(
new DAGEvent(
this.attemptId.getTaskID().getVertexID().getDAGId(),
getDAGID(),
DAGEventType.INTERNAL_ERROR)
);
}
Expand Down Expand Up @@ -1856,7 +1856,7 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;

int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex(
failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
failedDestTaId.getVertexID()).getRunningTasks();
float failureFraction =
runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0;
boolean withinFailureFractionLimits =
Expand Down
Loading