From 779f2ffedd5e0ee5ce7abaf13ae5bf20d08941e0 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 27 Jul 2023 19:31:13 +0530 Subject: [PATCH 1/8] TEZ-1037: Replace multiple members in TaskAttemptImpl for container related stuff with a single reference to the container. Change-Id: I987e92f3cc504f0701a2b7ebde53aac62daf84fc --- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 82 ++++++++----------- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +- 2 files changed, 35 insertions(+), 49 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 289f1a1887..d82ecde86f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -187,14 +187,8 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private String trackerName; private int httpPort; - // TODO Can these be replaced by the container object TEZ-1037 - private Container container; + protected Container container; private long allocationTime; - private ContainerId containerId; - protected NodeId containerNodeId; - private String nodeHttpAddress; - private String nodeRackName; - private final Vertex vertex; private final Task task; private final TaskLocationHint locationHint; @@ -614,8 +608,8 @@ public TaskAttemptReport getReport() { result.setContainerId(this.getAssignedContainerID()); result.setNodeManagerHost(trackerName); result.setNodeManagerHttpPort(httpPort); - if (this.containerNodeId != null) { - result.setNodeManagerPort(this.containerNodeId.getPort()); + if (this.container.getNodeId() != null) { + result.setNodeManagerPort(this.container.getNodeId().getPort()); } return result; } finally { @@ -714,7 +708,7 @@ public boolean isFinished() { public ContainerId getAssignedContainerID() { readLock.lock(); try { - return containerId; + return container.getId(); } finally { readLock.unlock(); } @@ -734,7 +728,7 @@ public Container getAssignedContainer() { public String getAssignedContainerMgrAddress() { readLock.lock(); try { - return containerNodeId.toString(); + return container.getNodeId().toString(); } finally { readLock.unlock(); } @@ -744,7 +738,7 @@ public String getAssignedContainerMgrAddress() { public NodeId getNodeId() { readLock.lock(); try { - return containerNodeId; + return container.getNodeId(); } finally { readLock.unlock(); } @@ -756,7 +750,7 @@ public NodeId getNodeId() { public String getNodeHttpAddress() { readLock.lock(); try { - return nodeHttpAddress; + return StringInterner.intern(container.getNodeHttpAddress()); } finally { readLock.unlock(); } @@ -769,7 +763,7 @@ public String getNodeHttpAddress() { public String getNodeRackName() { this.readLock.lock(); try { - return this.nodeRackName; + return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); } finally { this.readLock.unlock(); } @@ -1136,8 +1130,8 @@ protected void logJobHistoryAttemptStarted() { String completedLogsUrl = getCompletedLogsUrl(); TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), - launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + launchTime, container.getId(), container.getNodeId(), + inProgressLogsUrl, completedLogsUrl, StringInterner.intern(container.getNodeHttpAddress())); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -1174,8 +1168,8 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion( String completedLogsUrl = null; if (finishTime <= 0) { finishTime = clock.getTime(); // comes here in case it was terminated before launch - unsuccessfulContainerId = containerId; - unsuccessfulContainerNodeId = containerNodeId; + unsuccessfulContainerId = container != null ? container.getId() : null; + unsuccessfulContainerNodeId = container != null ? container.getNodeId() : null; inProgressLogsUrl = getInProgressLogsUrl(); completedLogsUrl = getCompletedLogsUrl(); } @@ -1186,8 +1180,9 @@ attemptId, getVertex().getName(), getLaunchTime(), terminationCause, StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, - taGeneratedEvents, creationTime, creationCausalTA, allocationTime, - unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + taGeneratedEvents, creationTime, creationCausalTA, allocationTime, unsuccessfulContainerId, + unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, + StringInterner.intern(container != null ? container.getNodeHttpAddress() : null)); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1199,9 +1194,9 @@ private String getInProgressLogsUrl() { TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (containerId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); - inProgressLogsUrl = nodeHttpAddress + if (container != null && container.getId() != null && container.getNodeHttpAddress() != null) { + final String containerIdStr = container.getId().toString(); + inProgressLogsUrl = StringInterner.intern(container.getNodeHttpAddress()) + "/" + "node/containerlogs" + "/" + containerIdStr + "/" + this.appContext.getUser(); @@ -1209,7 +1204,7 @@ private String getInProgressLogsUrl() { } else { inProgressLogsUrl = appContext.getTaskCommunicatorManager().getInProgressLogsUrl( getVertex().getTaskCommunicatorIdentifier(), - attemptId, containerNodeId); + attemptId, container.getNodeId()); } return inProgressLogsUrl; } @@ -1220,15 +1215,15 @@ private String getCompletedLogsUrl() { TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (containerId != null && containerNodeId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); + if (container != null && container.getId() != null && container.getNodeId() != null && container.getNodeHttpAddress() != null) { + final String containerIdStr = container.getId().toString(); if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { String contextStr = "v_" + getVertex().getName() + "_" + this.attemptId.toString(); completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) - + "/" + containerNodeId.toString() + + "/" + container.getNodeId().toString() + "/" + containerIdStr + "/" + contextStr + "/" + this.appContext.getUser(); @@ -1237,7 +1232,7 @@ private String getCompletedLogsUrl() { } else { completedLogsUrl = appContext.getTaskCommunicatorManager().getCompletedLogsUrl( getVertex().getTaskCommunicatorIdentifier(), - attemptId, containerNodeId); + attemptId, container.getNodeId()); } return completedLogsUrl; } @@ -1390,9 +1385,6 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; - ta.containerId = tEvent.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); } if (event instanceof TaskAttemptEventContainerTerminatedBySystem) { @@ -1402,9 +1394,6 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; - ta.containerId = tEvent.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); } if (ta.recoveryData == null || @@ -1444,25 +1433,20 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; - ta.containerId = event.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); - ta.nodeRackName = StringInterner.intern(RackResolver.resolve(ta.containerNodeId.getHost()) - .getNetworkLocation()); ta.lastNotifyProgressTimestamp = ta.clock.getTime(); ta.setLaunchTime(); // TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils - .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly? + .createSocketAddr(StringInterner.intern(container.getNodeHttpAddress())); // TODO: Costly? ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName()); ta.httpPort = nodeHttpInetAddr.getPort(); ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta)); LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted." - + " Is using containerId: [" + ta.containerId + "]" + " on NM: [" - + ta.containerNodeId + "]"); + + " Is using containerId: [" + ta.container.getId() + "]" + " on NM: [" + + ta.container.getNodeId() + "]"); // JobHistoryEvent. // The started event represents when the attempt was submitted to the executor. @@ -1470,9 +1454,10 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { // TODO Remove after HDFS-5098 // Compute LOCALITY counter for this task. - if (ta.taskHosts.contains(ta.containerNodeId.getHost())) { + if (ta.taskHosts.contains(ta.container.getNodeId().getHost())) { ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS; - } else if (ta.taskRacks.contains(ta.nodeRackName)) { + } else if (ta.taskRacks.contains( + StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()))) { ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS; } else { // Not computing this if the task does not have locality information. @@ -1531,9 +1516,10 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { super.transition(ta, event); // Inform the scheduler if (sendSchedulerEvent()) { - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper - .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), - ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null, + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container != null ? ta.container.getId() : null, + helper.getTaskAttemptState(), + TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), + ta instanceof DiagnosableEvent ? ((DiagnosableEvent) ta).getDiagnosticInfo() : null, ta.getVertex().getTaskSchedulerIdentifier())); } } @@ -1671,7 +1657,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptState.SUCCEEDED)); // Inform the Scheduler. - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index d0088bfc54..2d980923b7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -2315,7 +2315,7 @@ public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, boolean inputFailedReported = false; public MockTaskAttemptImpl setNodeId(NodeId nodeId) { - this.containerNodeId = nodeId; + this.container = Container.newInstance(null, nodeId, null, null, null, null); return this; } From 7520f5c27d992e5910f8956625f6ca701d7962d6 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 27 Jul 2023 21:33:09 +0530 Subject: [PATCH 2/8] Fix Test. Change-Id: I53d85bd7bd22283875122cda7b62af342f8cf689 --- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 17 ++++++++--------- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 -- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index d82ecde86f..1f71463894 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -187,7 +187,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private String trackerName; private int httpPort; - protected Container container; + Container container; private long allocationTime; private final Vertex vertex; private final Task task; @@ -619,11 +619,9 @@ public TaskAttemptReport getReport() { @Override public List getDiagnostics() { - List result = new ArrayList(); readLock.lock(); try { - result.addAll(diagnostics); - return result; + return new ArrayList(diagnostics); } finally { readLock.unlock(); } @@ -1139,7 +1137,7 @@ attemptId, getVertex().getName(), protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) { Preconditions.checkArgument(recoveryData == null || recoveryData.getTaskAttemptFinishedEvent() == null, - "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent"); if (getLaunchTime() == 0) return; TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( @@ -1157,7 +1155,7 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state, TaskFailureType taskFailureType) { Preconditions.checkArgument(recoveryData == null || recoveryData.getTaskAttemptFinishedEvent() == null, - "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent"); if (state == TaskAttemptState.FAILED && taskFailureType == null) { throw new IllegalStateException("FAILED state must be accompanied by a FailureType"); } @@ -1215,7 +1213,8 @@ private String getCompletedLogsUrl() { TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (container != null && container.getId() != null && container.getNodeId() != null && container.getNodeHttpAddress() != null) { + if (container != null && container.getId() != null && container.getNodeId() != null && + container.getNodeHttpAddress() != null) { final String containerIdStr = container.getId().toString(); if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) @@ -1634,7 +1633,7 @@ protected static class SucceededTransition implements public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { // If TaskAttempt is recovered to SUCCEEDED, send events generated by this TaskAttempt to vertex - // for its downstream consumers. For normal dag execution, the events are sent by TaskAttmeptListener + // for its downstream consumers. For normal dag execution, the events are sent by TaskAttemptListener // for performance consideration. if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) { TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData @@ -1657,7 +1656,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptState.SUCCEEDED)); // Inform the Scheduler. - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container!=null ? ta.container.getId() : null, TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 2d980923b7..deb4bd5341 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -138,8 +138,6 @@ @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttempt { - private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class); - static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { From bec335f12a8fdc3ab00f88e27c320f521d298ebb Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 1 Aug 2023 13:56:49 +0530 Subject: [PATCH 3/8] Remove unused import. Change-Id: Ibee2288595810c5330181a6eb416ff34bdd283fd --- .../java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index deb4bd5341..e629481537 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -130,8 +130,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; From 86664dc0db7b128ea2527f6e67ccdb00d0b75c22 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 22 Aug 2023 21:59:23 +0530 Subject: [PATCH 4/8] Address Review Comments. --- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 44 +++--- .../tez/dag/app/dag/impl/TezContainer.java | 130 ++++++++++++++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +- 3 files changed, 151 insertions(+), 25 deletions(-) create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 1f71463894..badeff5e61 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -187,7 +187,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private String trackerName; private int httpPort; - Container container; + TezContainer container = new TezContainer(null); private long allocationTime; private final Vertex vertex; private final Task task; @@ -716,7 +716,7 @@ public ContainerId getAssignedContainerID() { public Container getAssignedContainer() { readLock.lock(); try { - return container; + return new TezContainer(container); } finally { readLock.unlock(); } @@ -748,7 +748,7 @@ public NodeId getNodeId() { public String getNodeHttpAddress() { readLock.lock(); try { - return StringInterner.intern(container.getNodeHttpAddress()); + return container.getNodeHttpAddress(); } finally { readLock.unlock(); } @@ -761,7 +761,7 @@ public String getNodeHttpAddress() { public String getNodeRackName() { this.readLock.lock(); try { - return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); + return container.getRackName(); } finally { this.readLock.unlock(); } @@ -1129,7 +1129,7 @@ protected void logJobHistoryAttemptStarted() { TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), launchTime, container.getId(), container.getNodeId(), - inProgressLogsUrl, completedLogsUrl, StringInterner.intern(container.getNodeHttpAddress())); + inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress()); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -1166,8 +1166,8 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion( String completedLogsUrl = null; if (finishTime <= 0) { finishTime = clock.getTime(); // comes here in case it was terminated before launch - unsuccessfulContainerId = container != null ? container.getId() : null; - unsuccessfulContainerNodeId = container != null ? container.getNodeId() : null; + unsuccessfulContainerId = container.getId(); + unsuccessfulContainerNodeId = container.getNodeId(); inProgressLogsUrl = getInProgressLogsUrl(); completedLogsUrl = getCompletedLogsUrl(); } @@ -1179,8 +1179,7 @@ attemptId, getVertex().getName(), getLaunchTime(), StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, taGeneratedEvents, creationTime, creationCausalTA, allocationTime, unsuccessfulContainerId, - unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, - StringInterner.intern(container != null ? container.getNodeHttpAddress() : null)); + unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress()); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1192,9 +1191,9 @@ private String getInProgressLogsUrl() { TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (container != null && container.getId() != null && container.getNodeHttpAddress() != null) { + if (container.getId() != null && container.getNodeHttpAddress() != null) { final String containerIdStr = container.getId().toString(); - inProgressLogsUrl = StringInterner.intern(container.getNodeHttpAddress()) + inProgressLogsUrl = container.getNodeHttpAddress() + "/" + "node/containerlogs" + "/" + containerIdStr + "/" + this.appContext.getUser(); @@ -1213,7 +1212,7 @@ private String getCompletedLogsUrl() { TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (container != null && container.getId() != null && container.getNodeId() != null && + if (container.getId() != null && container.getNodeId() != null && container.getNodeHttpAddress() != null) { final String containerIdStr = container.getId().toString(); if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -1380,7 +1379,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { if (event instanceof TaskAttemptEventContainerTerminated) { TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event; AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId()); - Container container = amContainer.getContainer(); + TezContainer container = new TezContainer(amContainer.getContainer()); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; @@ -1392,7 +1391,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { Container container = amContainer.getContainer(); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); - ta.container = container; + ta.container = new TezContainer(container); } if (ta.recoveryData == null || @@ -1428,17 +1427,16 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent; AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); - Container container = amContainer.getContainer(); + TezContainer container = new TezContainer(amContainer.getContainer()); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); - ta.container = container; + ta.container = new TezContainer(container); ta.lastNotifyProgressTimestamp = ta.clock.getTime(); ta.setLaunchTime(); // TODO Resolve to host / IP in case of a local address. - InetSocketAddress nodeHttpInetAddr = NetUtils - .createSocketAddr(StringInterner.intern(container.getNodeHttpAddress())); // TODO: Costly? + InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(container.getNodeHttpAddress()); // TODO: Costly? ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName()); ta.httpPort = nodeHttpInetAddr.getPort(); ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta)); @@ -1455,8 +1453,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { // Compute LOCALITY counter for this task. if (ta.taskHosts.contains(ta.container.getNodeId().getHost())) { ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS; - } else if (ta.taskRacks.contains( - StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()))) { + } else if (ta.taskRacks.contains(container.getRackName())) { ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS; } else { // Not computing this if the task does not have locality information. @@ -1515,8 +1512,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { super.transition(ta, event); // Inform the scheduler if (sendSchedulerEvent()) { - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container != null ? ta.container.getId() : null, - helper.getTaskAttemptState(), + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), helper.getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), ta instanceof DiagnosableEvent ? ((DiagnosableEvent) ta).getDiagnosticInfo() : null, ta.getVertex().getTaskSchedulerIdentifier())); @@ -1656,8 +1652,8 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptState.SUCCEEDED)); // Inform the Scheduler. - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container!=null ? ta.container.getId() : null, - TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), TaskAttemptState.SUCCEEDED, null, null, + ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. ta.sendEvent(new TaskEventTASucceeded(ta.attemptId)); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java new file mode 100644 index 0000000000..fd9c8b3b28 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tez.util.StringInterner; + +public class TezContainer extends Container { + + private final Container container; + + public TezContainer(Container container) { + this.container = container; + } + + @Override + public ContainerId getId() { + return container != null ? container.getId() : null; + } + + @Override + public void setId(ContainerId id) { + container.setId(id); + } + + @Override + public NodeId getNodeId() { + return container != null ? container.getNodeId() : null; + } + + @Override + public void setNodeId(NodeId nodeId) { + container.setNodeId(nodeId); + } + + @Override + public String getNodeHttpAddress() { + return container != null ? StringInterner.intern(container.getNodeHttpAddress()) : null; + } + + @Override + public void setNodeHttpAddress(String nodeHttpAddress) { + container.setNodeHttpAddress(nodeHttpAddress); + } + + @Override + public Map>> getExposedPorts() { + return container.getExposedPorts(); + } + + @Override + public void setExposedPorts(Map>> ports) { + container.setExposedPorts(ports); + } + + @Override + public Resource getResource() { + return container.getResource(); + } + + @Override + public void setResource(Resource resource) { + container.setResource(resource); + } + + @Override + public Priority getPriority() { + return container.getPriority(); + } + + @Override + public void setPriority(Priority priority) { + container.setPriority(priority); + } + + @Override + public Token getContainerToken() { + return container.getContainerToken(); + } + + @Override + public void setContainerToken(Token containerToken) { + container.setContainerToken(containerToken); + } + + @Override + public ExecutionType getExecutionType() { + return container.getExecutionType(); + } + + @Override + public void setExecutionType(ExecutionType executionType) { + container.setExecutionType(executionType); + } + + @Override + public int compareTo(Container o) { + return container.compareTo(o); + } + + public String getRackName() { + return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index e629481537..ee8ec67cfd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -2311,7 +2311,7 @@ public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, boolean inputFailedReported = false; public MockTaskAttemptImpl setNodeId(NodeId nodeId) { - this.container = Container.newInstance(null, nodeId, null, null, null, null); + this.container = new TezContainer(Container.newInstance(null, nodeId, null, null, null, null)); return this; } From fdce5bba32bb48846c07cef4290f98b4d5b6f18f Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 22 Aug 2023 23:40:48 +0530 Subject: [PATCH 5/8] Fix Warnings & Test. --- .../org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 ++++-- .../java/org/apache/tez/dag/app/dag/impl/TezContainer.java | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index badeff5e61..fb8aed267b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -120,6 +120,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import static org.apache.tez.dag.app.dag.impl.TezContainer.NULL_TEZ_CONTAINER; + public class TaskAttemptImpl implements TaskAttempt, EventHandler { @@ -187,7 +189,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private String trackerName; private int httpPort; - TezContainer container = new TezContainer(null); + TezContainer container = NULL_TEZ_CONTAINER; private long allocationTime; private final Vertex vertex; private final Task task; @@ -716,7 +718,7 @@ public ContainerId getAssignedContainerID() { public Container getAssignedContainer() { readLock.lock(); try { - return new TezContainer(container); + return container == NULL_TEZ_CONTAINER ? null : container; } finally { readLock.unlock(); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java index fd9c8b3b28..13163caaf3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -33,6 +33,7 @@ public class TezContainer extends Container { + public final static TezContainer NULL_TEZ_CONTAINER = new TezContainer(null); private final Container container; public TezContainer(Container container) { @@ -124,6 +125,11 @@ public int compareTo(Container o) { return container.compareTo(o); } + @Override + public boolean equals(Object obj) { + return container == obj; + } + public String getRackName() { return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); } From f62348c1c51d0bcd54c3d64742cfd93d4232d4e6 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 23 Aug 2023 00:17:20 +0530 Subject: [PATCH 6/8] change implemented methods. --- .../tez/dag/app/dag/impl/TezContainer.java | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java index 13163caaf3..89e528c414 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -121,13 +121,33 @@ public void setExecutionType(ExecutionType executionType) { } @Override - public int compareTo(Container o) { - return container.compareTo(o); - } - - @Override - public boolean equals(Object obj) { - return container == obj; + public int compareTo(Container other) { + if (this.getId().compareTo(other.getId()) == 0) { + if (this.getNodeId().compareTo(other.getNodeId()) == 0) { + return this.getResource().compareTo(other.getResource()); + } else { + return this.getNodeId().compareTo(other.getNodeId()); + } + } else { + return this.getId().compareTo(other.getId()); + } + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + Container otherContainer = ((TezContainer) other).container; + if (this.container == null && otherContainer == null) { + return true; + } else if (this.container == null) { + return false; + } + return this.container.equals((otherContainer)); + } + return false; } public String getRackName() { From ad0ba81063bea3c0c8b7046ded1a584bdd182d55 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 23 Aug 2023 00:51:16 +0530 Subject: [PATCH 7/8] Add HashCode. --- .../java/org/apache/tez/dag/app/dag/impl/TezContainer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java index 89e528c414..5306545bd6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -150,6 +150,11 @@ public boolean equals(Object other) { return false; } + @Override + public int hashCode() { + return container.hashCode(); + } + public String getRackName() { return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); } From 0eb7b05162270b378b511edb6f9fc65ec8d9303a Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 23 Aug 2023 20:52:49 +0530 Subject: [PATCH 8/8] Add Javadoc. --- .../java/org/apache/tez/dag/app/dag/impl/TezContainer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java index 5306545bd6..ae58f80b72 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tez.util.StringInterner; +/** + * Convenience wrapper around {@link org.apache.hadoop.yarn.api.records.Container} + */ public class TezContainer extends Container { public final static TezContainer NULL_TEZ_CONTAINER = new TezContainer(null);