diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index 1ef56778c3..149bacdf5c 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -41,5 +41,51 @@ public enum DAGCounter { AM_CPU_MILLISECONDS, /** Wall clock time taken by all the tasks. */ WALL_CLOCK_MILLIS, - AM_GC_TIME_MILLIS + AM_GC_TIME_MILLIS, + + /* + * Type: # of containers + * Both allocated and launched containers before DAG start. + * This is incremented only once when the DAG starts and it's calculated + * by querying all the held containers from TaskSchedulers. + */ + INITIAL_HELD_CONTAINERS, + + /* + * Type: # of containers + * All containers that have been seen/used in this DAG by task allocation. + * This counter can be calculated at the end of DAG by simply counting the distinct + * ContainerIds that have been seen in TaskSchedulerManager.taskAllocated callbacks. + */ + TOTAL_CONTAINERS_USED, + + /* + * Type: # of events + * Number of container allocations during a DAG. This is incremented every time + * the containerAllocated callback is called in the TaskSchedulerContext. + * This counter doesn't account for initially held (launched, allocated) containers. + */ + TOTAL_CONTAINER_ALLOCATION_COUNT, + + /* + * Type: # of events + * Number of container launches during a DAG. This is incremented every time + * the containerLaunched callback is called in the ContainerLauncherContext. + * This counter doesn't account for initially held (launched, allocated) containers. + */ + TOTAL_CONTAINER_LAUNCH_COUNT, + + /* + * Type: # of events + * Number of container releases during a DAG. This is incremented every time + * the containerBeingReleased callback is called in the TaskSchedulerContext. + */ + TOTAL_CONTAINER_RELEASE_COUNT, + + /* + * Type: # of events + * Number of container reuses during a DAG. This is incremented every time + * the containerReused callback is called in the TaskSchedulerContext. + */ + TOTAL_CONTAINER_REUSE_COUNT } diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index b28a684de5..42ff56f2ba 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -14,6 +14,8 @@ package org.apache.tez.serviceplugins.api; +import java.util.List; + import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience; @@ -263,4 +265,19 @@ public abstract boolean deallocateTask(Object task, boolean taskSucceeded, */ public abstract void dagComplete() throws ServicePluginException; + /** + * Get the number of held containers. + */ + public int getHeldContainersCount() { + return 0; + } + + /** + * Callback to be used in the event of a container allocation. + */ + protected void onContainersAllocated(List containers) { + for (Container container : containers) { + getContext().containerAllocated(container); + } + } } diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index e188231e14..74342e2807 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -84,6 +84,20 @@ void taskAllocated(Object task, Object appCookie, Container container); + /** + * Indicate to the framework that a container is being allocated. + * + * @param container the actual container + */ + void containerAllocated(Container container); + + /** + * Indicate to the framework that a container is being reused: + * there is a task assigned to an already used container. + * + * @param container the actual container + */ + void containerReused(Container container); /** * Indicate to the framework that a container has completed. This is typically used by sources diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index 694de075d0..26637967ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; @@ -65,13 +66,13 @@ public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherMana @Override public void containerLaunched(ContainerId containerId) { + context.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_LAUNCH_COUNT, 1); context.getEventHandler().handle( new AMContainerEventLaunched(containerId)); ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( containerId, context.getClock().getTime(), context.getApplicationAttemptId()); context.getHistoryHandler().handle(new DAGHistoryEvent( null, lEvt)); - } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 3c99b1afd9..2476ef6b0c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -121,6 +121,7 @@ import org.apache.tez.common.TezConverterUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.VersionInfo; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.security.ACLManager; import org.apache.tez.common.security.JobTokenIdentifier; @@ -771,8 +772,9 @@ protected synchronized void handle(DAGAppMasterEvent event) { String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId()); System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId()); - // Stop vertex services if any - stopVertexServices(currentDAG); + + currentDAG.onFinish(); + if (!isSession) { LOG.info("Not a session, AM will unregister as DAG has completed"); this.taskSchedulerManager.setShouldUnregisterFlag(); @@ -1900,7 +1902,7 @@ void stopServices() { Exception firstException = null; // stop in reverse order of start if (currentDAG != null) { - stopVertexServices(currentDAG); + currentDAG.onFinish(); } List serviceList = new ArrayList(services.size()); for (ServiceWithDependency sd : services.values()) { @@ -2090,7 +2092,7 @@ public void serviceStart() throws Exception { dagEventDispatcher.handle(recoverDAGEvent); // If we reach here, then we have recoverable DAG and we need to // reinitialize the vertex services including speculators. - startVertexServices(currentDAG); + currentDAG.onStart(); this.state = DAGAppMasterState.RUNNING; } } else { @@ -2557,21 +2559,15 @@ public Void run() throws Exception { throw new TezUncheckedException(e); } + countHeldContainers(newDAG); startDAGExecution(newDAG, lrDiff); // set state after curDag is set this.state = DAGAppMasterState.RUNNING; } - private void startVertexServices(DAG dag) { - for (Vertex v : dag.getVertices().values()) { - v.startServices(); - } - } - - void stopVertexServices(DAG dag) { - for (Vertex v: dag.getVertices().values()) { - v.stopServices(); - } + private void countHeldContainers(DAG newDAG) { + newDAG.setDagCounter(DAGCounter.INITIAL_HELD_CONTAINERS, + taskSchedulerManager.getHeldContainersCount()); } private void startDAGExecution(DAG dag, final Map additionalAmResources) @@ -2605,8 +2601,9 @@ public List run() throws Exception { // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. dagEventDispatcher.handle(initDagEvent); - // Start the vertex services - startVertexServices(dag); + + dag.onStart(); + // All components have started, start the job. /** create a job-start event to get this ball rolling */ DAGEvent startDagEvent = new DAGEventStartDag(currentDAG.getID(), additionalUrlsForClasspath); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 280966ef78..ff5afb4099 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -23,8 +23,10 @@ import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGStatusBuilder; @@ -102,4 +104,18 @@ VertexStatusBuilder getVertexStatus(String vertexName, */ @Nullable DAGScheduler getDAGScheduler(); + void incrementDagCounter(DAGCounter counter, int incrValue); + void setDagCounter(DAGCounter counter, int setValue); + void addUsedContainer(ContainerId containerId); + + /** + * Called by the DAGAppMaster when the DAG is started normally or in the event of recovery. + */ + void onStart(); + + /** + * Called by the DAGAppMaster when the DAG is finished, or there is a currentDAG on AM stop. + * The implementation of this method should be idempontent. + */ + void onFinish(); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index aa28e02441..b00cea8b24 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -61,6 +61,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -248,6 +249,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, new CommitCompletedTransition(); private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final Set containersUsedByCurrentDAG = new HashSet<>(); protected static final StateMachineFactory @@ -1441,6 +1443,16 @@ private void updateCpuCounters() { dagCounters.findCounter(DAGCounter.AM_GC_TIME_MILLIS).setValue(totalDAGGCTime); } + @Override + public void incrementDagCounter(DAGCounter counter, int incrValue) { + dagCounters.findCounter(counter).increment(incrValue); + } + + @Override + public void setDagCounter(DAGCounter counter, int setValue) { + dagCounters.findCounter(counter).setValue(setValue); + } + private DAGState finished(DAGState finalState) { boolean dagError = false; try { @@ -2542,4 +2554,36 @@ public DAGImpl setLogDirs(String[] logDirs) { this.logDirs = logDirs; return this; } + + @Override + public void onStart() { + startVertexServices(); + } + + @Override + public void onFinish() { + stopVertexServices(); + handleUsedContainersOnDagFinish(); + } + + private void startVertexServices() { + for (Vertex v : getVertices().values()) { + v.startServices(); + } + } + + void stopVertexServices() { + for (Vertex v : getVertices().values()) { + v.stopServices(); + } + } + + @Override + public void addUsedContainer(ContainerId containerId) { + containersUsedByCurrentDAG.add(containerId); + } + + private void handleUsedContainersOnDagFinish() { + setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size()); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java index 1b8e19176d..6f2082adea 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java @@ -328,6 +328,8 @@ public void shutdown() throws Exception { @Override public void onContainersAllocated(List containers) { + super.onContainersAllocated(containers); + AMState appState = getContext().getAMState(); if (stopRequested || appState == AMState.COMPLETED) { LOG.info("Ignoring {} allocations since app is terminating", containers.size()); @@ -946,6 +948,9 @@ private void addTaskAssignment(TaskRequest request, HeldContainer hc) { assignedVertices.set(vertexIndex); } cset.add(hc); + if (!hc.isNew()) { + getContext().containerReused(hc.getContainer()); + } hc.assignTask(request); } @@ -1489,6 +1494,10 @@ Object getLastTask() { return lastRequest != null ? lastRequest.getTask() : null; } + boolean isNew() { + return lastRequest == null; + } + String getMatchingLocation() { switch (state) { case MATCHING_LOCAL: @@ -2089,4 +2098,9 @@ protected void afterExecute(Runnable r, Throwable t) { } } } + + @Override + public int getHeldContainersCount() { + return heldContainers.size(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index cc213cb0cb..20f37119d9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -513,4 +513,9 @@ void preemptTask(DeallocateContainerRequest request) { } } } + + @Override + public int getHeldContainersCount() { + return 0; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index a31b4f1e2d..948f7df321 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; @@ -69,6 +70,16 @@ public void taskAllocated(Object task, Object appCookie, Container container) { taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container); } + @Override + public void containerAllocated(Container container) { + appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_ALLOCATION_COUNT, 1); + } + + @Override + public void containerReused(Container container) { + appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_REUSE_COUNT, 1); + } + @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus); @@ -76,6 +87,7 @@ public void containerCompleted(Object taskLastAllocated, ContainerStatus contain @Override public void containerBeingReleased(ContainerId containerId) { + appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_RELEASE_COUNT, 1); taskSchedulerManager.containerBeingReleased(schedulerId, containerId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java index cb5980bc03..304ea8ad7f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -70,6 +70,16 @@ public void taskAllocated(Object task, Object appCookie, Container container) { container)); } + @Override + public void containerAllocated(Container container) { + executorService.submit(new ContainerAllocatedCallable(real, container)); + } + + @Override + public void containerReused(Container container) { + executorService.submit(new ContainerReusedCallable(real, container)); + } + @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { @@ -226,6 +236,38 @@ public Void call() throws Exception { } } + static class ContainerAllocatedCallable extends TaskSchedulerContextCallbackBase + implements Callable { + private final Container container; + + ContainerAllocatedCallable(TaskSchedulerContext app, Container container) { + super(app); + this.container = container; + } + + @Override + public Void call() throws Exception { + app.containerAllocated(container); + return null; + } + } + + static class ContainerReusedCallable extends TaskSchedulerContextCallbackBase + implements Callable { + private final Container container; + + ContainerReusedCallable(TaskSchedulerContext app, Container container) { + super(app); + this.container = container; + } + + @Override + public Void call() throws Exception { + app.containerReused(container); + return null; + } + } + static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase implements Callable { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index dfd48e6751..89d59aa104 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -746,7 +746,7 @@ public synchronized void taskAllocated(int schedulerId, Object task, sendEvent(new AMNodeEventContainerAllocated(container .getNodeId(), schedulerId, container.getId())); } - + appContext.getCurrentDAG().addUsedContainer(containerId); TaskAttempt taskAttempt = event.getTaskAttempt(); // TODO - perhaps check if the task still needs this container @@ -936,7 +936,7 @@ public void reportError(int taskSchedulerIndex, ServicePluginError servicePlugin } public void dagCompleted() { - for (int i = 0 ; i < taskSchedulers.length ; i++) { + for (int i = 0; i < taskSchedulers.length; i++) { try { taskSchedulers[i].dagComplete(); } catch (Exception e) { @@ -951,6 +951,14 @@ public void dagCompleted() { } } + public int getHeldContainersCount() { + int count = 0; + for (TaskSchedulerWrapper taskScheduler : taskSchedulers) { + count += taskScheduler.getTaskScheduler().getHeldContainersCount(); + } + return count; + } + public void dagSubmitted() { // Nothing to do right now. Indicates that a new DAG has been submitted and // the context has updated information. @@ -1083,5 +1091,4 @@ public String getTaskSchedulerClassName(int taskSchedulerIndex) { public TaskScheduler getTaskScheduler(int taskSchedulerIndex) { return taskSchedulers[taskSchedulerIndex].getTaskScheduler(); } - } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index ea0bdb92e8..888b0381be 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -491,6 +491,8 @@ public void onContainersCompleted(List statuses) { @Override public void onContainersAllocated(List containers) { + super.onContainersAllocated(containers); + if (isStopStarted.get()) { LOG.info("Ignoring container allocations because application is shutting down. Num " + containers.size()); @@ -1607,6 +1609,8 @@ private void assignContainer(Object task, heldContainers.put(container.getId(), new HeldContainer(container, heldContainer.getNextScheduleTime(), heldContainer.getContainerExpiryTime(), assigned, this.containerSignatureMatcher)); + } else { // if a held container is not new, it's most probably reused + getContext().containerReused(container); } heldContainer.setLastTaskInfo(assigned); } @@ -2400,4 +2404,9 @@ public String toString() { : "null"); } } + + @Override + public int getHeldContainersCount() { + return heldContainers.size(); + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 025bf8ad36..1e003accc5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -25,6 +25,8 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -42,6 +44,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.tez.common.DrainDispatcher; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConstants; @@ -56,6 +59,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; @@ -2354,4 +2358,22 @@ public void testCounterLimits() { } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testTotalContainersUsedCounter() { + initDAG(mrrDag); + dispatcher.await(); + startDAG(mrrDag); + dispatcher.await(); + + DAGImpl spy = spy(mrrDag); + spy.addUsedContainer(mock(ContainerId.class)); + spy.addUsedContainer(mock(ContainerId.class)); + + spy.onFinish(); + // 2 calls to addUsedContainer, obviously, we did it here + verify(spy, times(2)).addUsedContainer(any(ContainerId.class)); + // 1 call to setDagCounter, which happened at dag.onFinish + verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2); + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java index 0bd41f4a60..f1f2478db3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.UnknownHostException; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; @@ -75,6 +77,9 @@ public class TestContainerLauncherManager { + private static final String DAG_NAME = "dagName"; + private static final int DAG_INDEX = 1; + @Before @After public void resetTest() { @@ -249,14 +254,44 @@ public void testEventRouting() throws Exception { } } - @SuppressWarnings("unchecked") + + @SuppressWarnings("rawtypes") + @Test + public void testContainerLaunchCounter() throws TezException, InterruptedException, IOException { + AppContext appContext = mock(AppContext.class); + DAG dag = mock(DAG.class); + when(appContext.getCurrentDAG()).thenReturn(dag); + + EventHandler eventHandler = mock(EventHandler.class); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn("testlauncher").when(appContext).getContainerLauncherName(0); + + NamedEntityDescriptor containerLauncherDescriptor = + new NamedEntityDescriptor<>("testlauncher", ContainerLauncherForTest.class.getName()); + List descriptors = new LinkedList<>(); + descriptors.add(containerLauncherDescriptor); + + ContainerLauncherManager containerLauncherManager = + new ContainerLauncherManager(appContext, mock(TaskCommunicatorManagerInterface.class), "", descriptors, false); + + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Container container = mock(Container.class); + ContainerLauncherLaunchRequestEvent launchRequestEvent = + new ContainerLauncherLaunchRequestEvent(clc, container, 0, 0, 0); + containerLauncherManager.handle(launchRequestEvent); + containerLauncherManager.close(); + + // ContainerLauncherForTest is properly calling the context callbacks + // so it's supposed to handle increment DAGCounter.TOTAL_CONTAINER_LAUNCH_COUNT + verify(dag).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_LAUNCH_COUNT, 1); // launched + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test(timeout = 5000) public void testReportFailureFromContainerLauncher() throws ServicePluginException, TezException { - final String dagName = DAG_NAME; - final int dagIndex = DAG_INDEX; - TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), dagIndex); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), DAG_INDEX); DAG dag = mock(DAG.class); - doReturn(dagName).when(dag).getName(); + doReturn(DAG_NAME).when(dag).getName(); doReturn(dagId).when(dag).getID(); EventHandler eventHandler = mock(EventHandler.class); AppContext appContext = mock(AppContext.class); @@ -264,10 +299,10 @@ public void testReportFailureFromContainerLauncher() throws ServicePluginExcepti doReturn(dag).when(appContext).getCurrentDAG(); doReturn("testlauncher").when(appContext).getContainerLauncherName(0); - NamedEntityDescriptor taskCommDescriptor = - new NamedEntityDescriptor<>("testlauncher", ContainerLauncherForTest.class.getName()); + NamedEntityDescriptor containerLauncherDescriptor = + new NamedEntityDescriptor<>("testlauncher", FailureReporterContainerLauncher.class.getName()); List list = new LinkedList<>(); - list.add(taskCommDescriptor); + list.add(containerLauncherDescriptor); ContainerLauncherManager containerLauncherManager = new ContainerLauncherManager(appContext, mock(TaskCommunicatorManagerInterface.class), "", list, false); @@ -514,26 +549,41 @@ public void stopContainer(ContainerStopRequest stopRequest) { } } - private static final String DAG_NAME = "dagName"; - private static final int DAG_INDEX = 1; - public static class ContainerLauncherForTest extends ContainerLauncher { + public static class FailureReporterContainerLauncher extends ContainerLauncher { - public ContainerLauncherForTest( - ContainerLauncherContext containerLauncherContext) { + public FailureReporterContainerLauncher(ContainerLauncherContext containerLauncherContext) { super(containerLauncherContext); } @Override - public void launchContainer(ContainerLaunchRequest launchRequest) throws - ServicePluginException { + public void launchContainer(ContainerLaunchRequest launchRequest) throws ServicePluginException { getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null); } @Override public void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException { - getContext() - .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME)); + getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", + new DagInfoImplForTest(DAG_INDEX, DAG_NAME)); } } + /** + * This container launcher simply implements ContainerLauncher methods with the proper context callbacks. + */ + public static class ContainerLauncherForTest extends ContainerLauncher { + + public ContainerLauncherForTest(ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) throws ServicePluginException { + getContext().containerLaunched(launchRequest.getContainerId()); + } + + @Override + public void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException { + getContext().containerStopRequested(stopRequest.getContainerId()); + } + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index fe5a888b56..1fc418f8c2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.mockito.ArgumentCaptor; import org.slf4j.Logger; @@ -69,6 +70,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest; @@ -126,6 +128,7 @@ public void testDelayedReuseContainerBecomesAvailable() conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); @@ -143,6 +146,7 @@ public void testDelayedReuseContainerBecomesAvailable() doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -257,6 +261,7 @@ public void testDelayedReuseContainerNotAvailable() conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); @@ -274,6 +279,7 @@ public void testDelayedReuseContainerNotAvailable() doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -358,6 +364,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -371,17 +378,18 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerManager - taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); taskSchedulerManager.init(tezConf); taskSchedulerManager.start(); - TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) - .getSpyTaskScheduler(); + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -428,6 +436,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(), eq(container1)); + verify(dag).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_ALLOCATION_COUNT, 1); // allocated // Task assigned to container completed successfully. Container should be re-used. taskSchedulerManager.handleEvent( @@ -438,6 +447,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + verify(dag).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_REUSE_COUNT, 1); // reused eventHandler.reset(); // Task assigned to container completed successfully. @@ -451,6 +461,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + verify(dag, times(2)).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_REUSE_COUNT, 1); // reused again eventHandler.reset(); // Verify no re-use if a previous task fails. @@ -463,6 +474,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT"); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + verify(dag).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_RELEASE_COUNT, 1); // released eventHandler.reset(); Container container2 = createContainer(2, "host2", resource1, priority1); @@ -482,10 +494,13 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio verifyDeAllocateTask(taskScheduler, ta14, true, null, null); verify(rmClient).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + verify(dag, times(2)).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_ALLOCATION_COUNT, 1); // new allocation + verify(dag, times(2)).incrementDagCounter(DAGCounter.TOTAL_CONTAINER_RELEASE_COUNT, 1); // then released again eventHandler.reset(); taskScheduler.shutdown(); taskSchedulerManager.close(); + dag.onFinish(); } @Test(timeout = 10000l) @@ -503,6 +518,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(tezConf); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -516,6 +532,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -690,6 +707,7 @@ public void testReuseNonLocalRequest() tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -706,6 +724,7 @@ public void testReuseNonLocalRequest() doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -814,6 +833,7 @@ public void testReuseAcrossVertices() TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -831,6 +851,7 @@ public void testReuseAcrossVertices() doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); doReturn(true).when(appContext).isSession(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -931,6 +952,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); + DAG dag1 = mock(DAG.class); AMRMClient rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); @@ -943,6 +965,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(dag1).when(appContext).getCurrentDAG(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); doReturn(true).when(appContext).isSession(); doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); @@ -1088,6 +1111,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); + DAG dag1 = mock(DAG.class); AMRMClient rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); @@ -1100,6 +1124,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(dag1).when(appContext).getCurrentDAG(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); doReturn(true).when(appContext).isSession(); doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); @@ -1112,8 +1137,8 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskSchedulerManager.init(tezConf); taskSchedulerManager.start(); - TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) - .getSpyTaskScheduler(); + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -1324,6 +1349,7 @@ public void testAssignmentOnShutdown() tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -1338,6 +1364,7 @@ public void testAssignmentOnShutdown() doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.SUCCEEDED).when(appContext).getAMState(); doReturn(true).when(appContext).isAMInCompletionState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -1391,6 +1418,7 @@ public void testDifferentResourceContainerReuse() throws Exception { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -1405,6 +1433,7 @@ public void testDifferentResourceContainerReuse() throws Exception { doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -1527,6 +1556,7 @@ public void testEnvironmentVarsContainerReuse() throws Exception { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); CapturingEventHandler eventHandler = new CapturingEventHandler(); + DAG dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient rmClientCore = new AMRMClientForTest(); @@ -1541,6 +1571,7 @@ public void testEnvironmentVarsContainerReuse() throws Exception { doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 08941e762a..1421aa1510 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -276,6 +276,20 @@ public void taskAllocated(Object task, Object appCookie, Container container) { real.taskAllocated(task, appCookie, container); } + @Override + public void containerAllocated(Container container) { + count.incrementAndGet(); + invocations++; + real.containerAllocated(container); + } + + @Override + public void containerReused(Container container) { + count.incrementAndGet(); + invocations++; + real.containerReused(container); + } + @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 9d423c047c..e3dd1ac924 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -175,7 +175,7 @@ protected void notifyForTest() { DAGClientServer mockClientService; TestEventHandler mockEventHandler; ContainerSignatureMatcher mockSigMatcher; - MockTaskSchedulerManager schedulerHandler; + MockTaskSchedulerManager taskSchedulerManager; TaskScheduler mockTaskScheduler; AMContainerMap mockAMContainerMap; WebUIService mockWebUIService; @@ -192,15 +192,15 @@ public void setup() { mockWebUIService = mock(WebUIService.class); when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000)); - schedulerHandler = new MockTaskSchedulerManager( + taskSchedulerManager = new MockTaskSchedulerManager( mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService); } @Test(timeout = 5000) public void testSimpleAllocate() throws Exception { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); @@ -226,20 +226,22 @@ public void testSimpleAllocate() throws Exception { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); + taskSchedulerManager.taskAllocated(0, mockTaskAttempt, lr, container); assertEquals(1, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = (AMContainerEventAssignTA) mockEventHandler.events.get(0); assertEquals(priority, assignEvent.getPriority()); assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); + + verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated } @Test(timeout = 5000) public void testTASucceededAfterContainerCleanup() throws Exception { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); @@ -266,7 +268,7 @@ public void testTASucceededAfterContainerCleanup() throws Exception { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); + taskSchedulerManager.taskAllocated(0, mockTaskAttempt, lr, container); assertEquals(1, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = @@ -278,8 +280,8 @@ public void testTASucceededAfterContainerCleanup() throws Exception { @Test(timeout = 5000) public void testTAUnsuccessfulAfterContainerCleanup() throws Exception { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); @@ -298,7 +300,7 @@ public void testTAUnsuccessfulAfterContainerCleanup() throws Exception { // Returning null container will replicate container cleanup scenario when(mockAMContainerMap.get(mockCId)).thenReturn(null); - schedulerHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded( mockTaskAttempt, mockCId, TaskAttemptState.KILLED, null, null, 0)); assertEquals(1, mockEventHandler.events.size()); @@ -311,8 +313,8 @@ public void testTAUnsuccessfulAfterContainerCleanup() throws Exception { @Test (timeout = 5000) public void testTaskBasedAffinity() throws Exception { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); TezTaskAttemptID taId = mock(TezTaskAttemptID.class); @@ -331,11 +333,11 @@ public void testTaskBasedAffinity() throws Exception { Resource resource = Resource.newInstance(100, 1); AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0); - schedulerHandler.notify.set(false); - schedulerHandler.handle(event); - synchronized (schedulerHandler.notify) { - while (!schedulerHandler.notify.get()) { - schedulerHandler.notify.wait(); + taskSchedulerManager.notify.set(false); + taskSchedulerManager.handle(event); + synchronized (taskSchedulerManager.notify) { + while (!taskSchedulerManager.notify.get()) { + taskSchedulerManager.notify.wait(); } } @@ -343,15 +345,15 @@ public void testTaskBasedAffinity() throws Exception { verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId, Priority.newInstance(3), null, event); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test (timeout = 5000) public void testContainerPreempted() throws IOException { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); String diagnostics = "Container preempted by RM."; TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); @@ -363,7 +365,7 @@ public void testContainerPreempted() throws IOException { when(mockStatus.getContainerId()).thenReturn(mockCId); when(mockStatus.getDiagnostics()).thenReturn(diagnostics); when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); + taskSchedulerManager.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -376,15 +378,15 @@ public void testContainerPreempted() throws IOException { completedEvent.getTerminationCause()); Assert.assertFalse(completedEvent.isDiskFailed()); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test (timeout = 5000) public void testContainerInternalPreempted() throws IOException, ServicePluginException { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); AMContainer mockAmContainer = mock(AMContainer.class); when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); @@ -393,7 +395,7 @@ public void testContainerInternalPreempted() throws IOException, ServicePluginEx ContainerId mockCId = mock(ContainerId.class); verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any()); when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer); - schedulerHandler.preemptContainer(0, mockCId); + taskSchedulerManager.preemptContainer(0, mockCId); verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); @@ -406,15 +408,15 @@ public void testContainerInternalPreempted() throws IOException, ServicePluginEx assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, completedEvent.getTerminationCause()); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test(timeout = 5000) public void testContainerInternalPreemptedAfterContainerCleanup() throws IOException, ServicePluginException { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); AMContainer mockAmContainer = mock(AMContainer.class); when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); @@ -424,7 +426,7 @@ public void testContainerInternalPreemptedAfterContainerCleanup() throws IOExcep verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any()); // Returning null container will replicate container cleanup scenario when(mockAMContainerMap.get(mockCId)).thenReturn(null); - schedulerHandler.preemptContainer(0, mockCId); + taskSchedulerManager.preemptContainer(0, mockCId); verify(mockTaskScheduler, times(0)).deallocateContainer(mockCId); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); @@ -437,15 +439,15 @@ public void testContainerInternalPreemptedAfterContainerCleanup() throws IOExcep assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, completedEvent.getTerminationCause()); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test (timeout = 5000) public void testContainerDiskFailed() throws IOException { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); String diagnostics = "NM disk failed."; TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); @@ -457,7 +459,7 @@ public void testContainerDiskFailed() throws IOException { when(mockStatus.getContainerId()).thenReturn(mockCId); when(mockStatus.getDiagnostics()).thenReturn(diagnostics); when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); + taskSchedulerManager.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -470,15 +472,15 @@ public void testContainerDiskFailed() throws IOException { assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, completedEvent.getTerminationCause()); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test (timeout = 5000) public void testContainerExceededPMem() throws IOException { Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); String diagnostics = "Exceeded Physical Memory"; TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); @@ -492,7 +494,7 @@ public void testContainerExceededPMem() throws IOException { // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5 when(mockStatus.getExitStatus()).thenReturn(-104); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); + taskSchedulerManager.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -505,13 +507,13 @@ public void testContainerExceededPMem() throws IOException { assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, completedEvent.getTerminationCause()); - schedulerHandler.stop(); - schedulerHandler.close(); + taskSchedulerManager.stop(); + taskSchedulerManager.close(); } @Test (timeout = 5000) public void testHistoryUrlConf() throws Exception { - Configuration conf = schedulerHandler.appContext.getAMConf(); + Configuration conf = taskSchedulerManager.appContext.getAMConf(); final ApplicationId mockApplicationId = mock(ApplicationId.class); doReturn("TEST_APP_ID").when(mockApplicationId).toString(); doReturn(mockApplicationId).when(mockAppContext).getApplicationID(); @@ -519,35 +521,35 @@ public void testHistoryUrlConf() throws Exception { // ensure history url is empty when timeline server is not the logging class conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999"); assertEquals("http://ui-host:9999/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); // ensure the trailing / in history url is handled conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9998/"); assertEquals("http://ui-host:9998/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); // ensure missing scheme in history url is handled conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/"); assertEquals("http://ui-host:9998/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); // handle bad template ex without begining / conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE, "__HISTORY_URL_BASE__#/somepath"); assertEquals("http://ui-host:9998/#/somepath", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE, "__HISTORY_URL_BASE__?viewPath=tez-app/__APPLICATION_ID__"); conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://localhost/ui/tez"); assertEquals("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); } @Test (timeout = 5000) public void testHistoryUrlWithoutScheme() throws Exception { - Configuration conf = schedulerHandler.appContext.getAMConf(); + Configuration conf = taskSchedulerManager.appContext.getAMConf(); final ApplicationId mockApplicationId = mock(ApplicationId.class); doReturn("TEST_APP_ID").when(mockApplicationId).toString(); doReturn(mockApplicationId).when(mockAppContext).getApplicationID(); @@ -555,16 +557,16 @@ public void testHistoryUrlWithoutScheme() throws Exception { conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "/foo/bar/"); conf.setBoolean(TezConfiguration.TEZ_AM_UI_HISTORY_URL_SCHEME_CHECK_ENABLED, false); assertEquals("/foo/bar/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/foo/bar/"); assertEquals("ui-host:9998/foo/bar/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); conf.setBoolean(TezConfiguration.TEZ_AM_UI_HISTORY_URL_SCHEME_CHECK_ENABLED, true); conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/foo/bar/"); assertEquals("http://ui-host:9998/foo/bar/#/tez-app/TEST_APP_ID", - schedulerHandler.getHistoryUrl()); + taskSchedulerManager.getHistoryUrl()); } @Test(timeout = 5000) @@ -909,6 +911,30 @@ public void testHandleException() throws Exception { eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class); } + @Test + public void testTaskSchedulerManangerHeldContainers() throws IOException { + Configuration conf = new Configuration(false); + UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf); + + String customSchedulerName = "fakeScheduler"; + List taskSchedulers = new LinkedList<>(); + UserPayload userPayload = UserPayload.create(ByteBuffer.allocate(4)); + taskSchedulers.add( + new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName()).setUserPayload(userPayload)); + taskSchedulers.add( + new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload)); + + TSEHForMultipleSchedulersTest tseh = new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, + mockEventHandler, mockSigMatcher, mockWebUIService, taskSchedulers, false); + + tseh.init(conf); + tseh.start(); + + Assert.assertEquals(TSEHForMultipleSchedulersTest.YARN_TASK_SCHEDULER_HELD_CONTAINERS + + TSEHForMultipleSchedulersTest.CUSTOM_TASK_SCHEDULER_HELD_CONTAINERS, tseh.getHeldContainersCount()); + tseh.close(); + } + private static class ExceptionAnswer implements Answer { @Override public Object answer(InvocationOnMock invocation) throws Throwable { @@ -925,6 +951,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager { + public static final Integer YARN_TASK_SCHEDULER_HELD_CONTAINERS = 3; + public static final Integer CUSTOM_TASK_SCHEDULER_HELD_CONTAINERS = 2; private final TaskScheduler yarnTaskScheduler; private final TaskScheduler uberTaskScheduler; private final AtomicBoolean uberSchedulerCreated = new AtomicBoolean(false); @@ -975,14 +1003,15 @@ TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, taskSchedulerContexts.add(taskSchedulerContext); testTaskSchedulers.add(yarnTaskScheduler); yarnSchedulerCreated.set(true); + when(yarnTaskScheduler.getHeldContainersCount()).thenReturn(YARN_TASK_SCHEDULER_HELD_CONTAINERS); return yarnTaskScheduler; } @Override TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) { taskSchedulerContexts.add(taskSchedulerContext); + testTaskSchedulers.add(uberTaskScheduler); uberSchedulerCreated.set(true); - testTaskSchedulers.add(yarnTaskScheduler); return uberTaskScheduler; } @@ -993,6 +1022,7 @@ TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContex taskSchedulerContexts.add(taskSchedulerContext); TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId)); testTaskSchedulers.add(taskScheduler); + when(taskScheduler.getHeldContainersCount()).thenReturn(CUSTOM_TASK_SCHEDULER_HELD_CONTAINERS); return taskScheduler; } @@ -1097,6 +1127,11 @@ public void setShouldUnregister() { public boolean hasUnregistered() { return false; } + + @Override + public int getHeldContainersCount() { + return 0; + } } private static final String DAG_NAME = "dagName"; @@ -1176,6 +1211,11 @@ public boolean hasUnregistered() throws ServicePluginException { @Override public void dagComplete() throws ServicePluginException { } + + @Override + public int getHeldContainersCount() { + return 0; + } } public static void waitFor(Supplier check, int checkEveryMillis, diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index d7d0f01126..eafedef11c 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -251,4 +251,9 @@ public Container createContainer(Resource capability, Priority priority, String return container; } } + + @Override + public int getHeldContainersCount() { + return 0; + } } diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java index 13d4815f02..60cc1f22a9 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java @@ -99,4 +99,9 @@ public boolean hasUnregistered() { @Override public void dagComplete() { } + + @Override + public int getHeldContainersCount() { + return 0; + } }