From a2429788a7062ffa4d0c22b72f5ac3c53c36a203 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Sun, 17 Jan 2021 15:19:55 -0500 Subject: [PATCH 1/2] Round 1 of changes --- .../org/apache/tez/client/TezClientUtils.java | 13 ++---- .../apache/tez/common/JavaOptsChecker.java | 11 ++--- .../org/apache/tez/common/ProgressHelper.java | 12 ++--- .../apache/tez/dag/api/TezConfiguration.java | 4 +- .../tez/dag/api/client/DAGClientImpl.java | 16 ++----- .../dag/api/client/TimelineReaderFactory.java | 6 +-- .../dag/api/client/rpc/DAGClientRPCImpl.java | 8 +--- .../tez/dag/api/client/DAGClientHandler.java | 4 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 36 +++++---------- .../apache/tez/dag/app/RecoveryParser.java | 7 +-- .../tez/dag/app/TaskCommunicatorManager.java | 13 ++---- .../dag/app/TezLocalTaskCommunicatorImpl.java | 4 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 17 ++----- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 31 ++++--------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 12 ++--- .../tez/dag/app/dag/impl/VertexImpl.java | 46 +++++-------------- .../speculation/legacy/LegacySpeculator.java | 4 +- .../launcher/TezContainerLauncherImpl.java | 4 +- .../dag/app/rm/DagAwareYarnTaskScheduler.java | 2 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 8 +--- .../dag/app/rm/YarnTaskSchedulerService.java | 31 ++++--------- .../app/rm/container/AMContainerHelpers.java | 4 +- .../tez/dag/app/web/AMWebController.java | 8 +--- .../apache/tez/dag/app/web/WebUIService.java | 4 +- .../dag/history/recovery/RecoveryService.java | 22 +++------ .../split/TezGroupedSplitsInputFormat.java | 12 ++--- .../split/TezGroupedSplitsInputFormat.java | 12 ++--- .../tez/mapreduce/client/YARNRunner.java | 4 +- .../tez/mapreduce/hadoop/MRInputHelpers.java | 8 +--- .../apache/tez/mapreduce/output/MROutput.java | 4 +- .../tez/mapreduce/processor/MRTask.java | 5 +- .../apache/tez/auxservices/IndexCache.java | 17 ++----- .../tez/auxservices/ShuffleHandler.java | 24 +++------- .../LogicalIOProcessorRuntimeTask.java | 24 +++------- .../runtime/api/impl/TezInputContextImpl.java | 4 +- .../api/impl/TezOutputContextImpl.java | 4 +- .../api/impl/TezProcessorContextImpl.java | 4 +- .../common/resources/ScalingAllocator.java | 9 +--- .../apache/tez/runtime/task/TaskReporter.java | 8 +--- .../org/apache/tez/runtime/task/TezChild.java | 4 +- .../org/apache/tez/http/HttpConnection.java | 8 +--- .../http/async/netty/AsyncHttpConnection.java | 4 +- .../impl/ShuffleInputEventHandlerImpl.java | 12 ++--- .../common/shuffle/impl/ShuffleManager.java | 8 +--- 45 files changed, 137 insertions(+), 370 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index d34d31e4cf..1e09489564 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -479,9 +479,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext( capability.setVirtualCores( amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT)); - if (LOG.isDebugEnabled()) { - LOG.debug("AppMaster capability = " + capability); - } + LOG.debug("AppMaster capability = {}", capability); // Setup required Credentials for the AM launch. DAG specific credentials // are handled separately. @@ -531,10 +529,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext( } vargsFinal.add(mergedCommand.toString()); - if (LOG.isDebugEnabled()) { - LOG.debug("Command to launch container for ApplicationMaster is : " - + mergedCommand); - } + LOG.debug("Command to launch container for ApplicationMaster is : {}", mergedCommand); Map environment = new TreeMap(); TezYARNUtils.setupDefaultEnv(environment, conf, @@ -968,9 +963,7 @@ public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, serviceAddr); userUgi.addToken(token); } - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to Tez AM at " + serviceAddr); - } + LOG.debug("Connecting to Tez AM at {}", serviceAddr); DAGClientAMProtocolBlockingPB proxy = null; try { proxy = userUgi.doAs(new PrivilegedExceptionAction() { diff --git a/tez-api/src/main/java/org/apache/tez/common/JavaOptsChecker.java b/tez-api/src/main/java/org/apache/tez/common/JavaOptsChecker.java index 6de402aae9..3e93446149 100644 --- a/tez-api/src/main/java/org/apache/tez/common/JavaOptsChecker.java +++ b/tez-api/src/main/java/org/apache/tez/common/JavaOptsChecker.java @@ -38,9 +38,8 @@ public class JavaOptsChecker { public void checkOpts(String opts) throws TezException { Set gcOpts = new TreeSet(); - if (LOG.isDebugEnabled()) { - LOG.debug("Checking JVM GC opts: " + opts); - } + LOG.debug("Checking JVM GC opts: {}", opts); + Matcher matcher = pattern.matcher(opts); while (matcher.find()) { if (matcher.groupCount() != 3) { @@ -74,10 +73,8 @@ public void checkOpts(String opts) throws TezException { } } - if (LOG.isDebugEnabled()) { - LOG.debug("Found clashing GC opts" - + ", conflicting GC Values=" + gcOpts); - } + LOG.debug("Found clashing GC opts, conflicting GC Values={}", gcOpts); + throw new TezException("Invalid/conflicting GC options found," + " cmdOpts=\"" + opts + "\""); } diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java index 1518ccdaf1..289847a96a 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java +++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java @@ -117,10 +117,8 @@ public void run() { // Report progress as 0.0f when if are errors. processorContext.setProgress(progressVal); } catch (Throwable th) { - if (LOG.isDebugEnabled()) { - LOG.debug("progress update: Encountered InterruptedException during" - + " Processor={}", processorName, th); - } + LOG.debug("progress update: Encountered InterruptedException during" + + " Processor={}", processorName, th); if (th instanceof InterruptedException) { // set interrupt flag to true sand exit Thread.currentThread().interrupt(); @@ -161,10 +159,8 @@ public void shutDownProgressTaskService() { scheduledExecutorService.shutdownNow(); } } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted exception while shutting down the " - + "executor service for the processor name={}", processorName); - } + LOG.debug("Interrupted exception while shutting down the " + + "executor service for the processor name={}", processorName); } scheduledExecutorService.shutdownNow(); } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 85f85181ca..179b1957e8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1899,9 +1899,7 @@ public TezConfiguration(boolean loadDefaults) { public static void validateProperty(String property, Scope usedScope) { Scope validScope = PropertyScope.get(property); if (validScope == null) { - if (LOG.isDebugEnabled()) { - LOG.debug(property + " is not standard configuration property of tez, can not been validated"); - } + LOG.debug("{} is not standard configuration property of tez, can not been validated", property); } else { if (usedScope.ordinal() > validScope.ordinal()) { throw new IllegalStateException(property + " is set at the scope of " + usedScope diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 01a10b269c..b54db324a9 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -247,9 +247,7 @@ private DAGStatus getDAGStatusInternal(@Nullable Set statusOption LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline" + " - Application not found by YARN", e); } catch (TezException e) { - if (LOG.isDebugEnabled()) { - LOG.info("DAGStatus fetch failed." + e.getMessage()); - } + LOG.debug("DAGStatus fetch failed", e); } } @@ -302,9 +300,7 @@ public VertexStatus getVertexStatus(String vertexName, Set status + " - Application not found by YARN", e); return null; } catch (TezException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage()); - } + LOG.debug("ERROR fetching vertex data from Yarn Timeline", e); } } @@ -425,9 +421,7 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set */ @VisibleForTesting protected DAGStatus getDAGStatusViaRM() throws TezException, IOException { - if(LOG.isDebugEnabled()) { - LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId); - } + LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId); ApplicationReport appReport; try { appReport = frameworkClient.getApplicationReport(appId); @@ -638,9 +632,7 @@ private void switchToTimelineClient() throws IOException, TezException { realClient.close(); realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient, (int) (2 * PRINT_STATUS_INTERVAL_MILLIS)); - if (LOG.isDebugEnabled()) { - LOG.debug("dag completed switching to DAGClientTimelineImpl"); - } + LOG.debug("dag completed switching to DAGClientTimelineImpl"); } @VisibleForTesting diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java index 40340cc44e..fc1595fa49 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java @@ -313,10 +313,8 @@ private static ConnectionConfigurator getNewConnectionConf(final Configuration c try { connectionConf = getNewSSLConnectionConf(conf, connTimeout, sslFactory); } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot load customized ssl related configuration." - + " Falling back to system-generic settings.", e); - } + LOG.debug("Cannot load customized ssl related configuration." + + " Falling back to system-generic settings.", e); } } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index c54058be8a..5d5752e6e2 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -146,9 +146,7 @@ public String getSessionIdentifierString() { @Override public void tryKillDAG() throws TezException, IOException { - if(LOG.isDebugEnabled()) { - LOG.debug("TryKill for app: " + appId + " dag:" + dagId); - } + LOG.debug("TryKill for app: {} dag:{}", appId, dagId); try { if (createAMProxyIfNeeded()) { TryKillDAGRequestProto requestProto = @@ -186,9 +184,7 @@ void resetProxy(Exception e) { DAGStatus getDAGStatusViaAM(Set statusOptions, long timeout) throws IOException, TezException { - if(LOG.isDebugEnabled()) { - LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId); - } + LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId); GetDAGStatusRequestProto.Builder requestProtoBuilder = GetDAGStatusRequestProto.newBuilder() .setDagId(dagId).setTimeout(timeout); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 618676d978..4cdd1ec9d1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -100,9 +100,7 @@ DAG getDAG(String dagIdStr) throws TezException { final String currentDAGIdStr = currentDAG.getID().toString(); if (!currentDAGIdStr.equals(dagIdStr)) { if (getAllDagIDs().contains(dagIdStr)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr); - } + LOG.debug("Looking for finished dagId {} current dag is {}", dagIdStr, currentDAGIdStr); throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " + currentDAGIdStr); } else { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index f4a8923d4a..cde77b3bf6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -576,9 +576,7 @@ public synchronized void serviceInit(final Configuration conf) throws Exception this.webUIService = new WebUIService(context); addIfService(webUIService, false); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Web UI Service is not enabled."); - } + LOG.debug("Web UI Service is not enabled."); } this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors); @@ -1335,9 +1333,8 @@ public String submitDAGToAppMaster(DAGPlan dagPlan, // the job user's UGI context LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Invoked with additional local resources: " + additionalResources); - } + LOG.debug("Invoked with additional local resources: {}", additionalResources); + if (!dagPlan.getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { submittedDAGs.incrementAndGet(); } @@ -1863,9 +1860,8 @@ void startServices() { try { Throwable firstError = null; List threads = new ArrayList(); - if(LOG.isDebugEnabled()) { - LOG.debug("Begin parallel start"); - } + LOG.debug("Begin parallel start"); + for(ServiceWithDependency sd : services.values()) { // start the service. If this fails that service // will be stopped and an exception raised @@ -1889,9 +1885,7 @@ void startServices() { if(firstError != null) { throw ServiceStateException.convert(firstError); } - if(LOG.isDebugEnabled()) { - LOG.debug("End parallel start"); - } + LOG.debug("End parallel start"); } catch (InterruptedException e) { e.printStackTrace(); } @@ -1899,9 +1893,7 @@ void startServices() { void initServices(Configuration conf) { for (ServiceWithDependency sd : services.values()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Initing service : " + sd.service); - } + LOG.debug("Initing service : {}", sd.service); sd.service.init(conf); } } @@ -1919,9 +1911,7 @@ void stopServices() { for (int i = services.size() - 1; i >= 0; i--) { Service service = serviceList.get(i); - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping service : " + service); - } + LOG.debug("Stopping service : {}", service); Exception ex = ServiceOperations.stopQuietly(service); if (ex != null && firstException == null) { LOG.warn("Failed to stop service, name=" + service.getName(), ex); @@ -2163,10 +2153,8 @@ public void serviceStop() throws Exception { boolean deleteTezScratchData = this.amConf.getBoolean( TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT); - if (LOG.isDebugEnabled()) { - LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData=" - + deleteTezScratchData); - } + LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData={}", + deleteTezScratchData); if (deleteTezScratchData && this.taskSchedulerManager != null && this.taskSchedulerManager.hasUnregistered()) { // Delete tez scratch data dir @@ -2443,9 +2431,7 @@ static class DAGAppMasterShutdownHook implements Runnable { public void run() { LOG.info("DAGAppMasterShutdownHook invoked"); if(appMaster.getServiceState() == STATE.STOPPED) { - if(LOG.isDebugEnabled()) { - LOG.debug("DAGAppMaster already stopped. Ignoring signal"); - } + LOG.debug("DAGAppMaster already stopped. Ignoring signal"); synchronized (appMaster.shutdownHandlerRunning) { try { if (appMaster.shutdownHandlerRunning.get()) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index dfb7f61e6e..19c24f300c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -346,11 +346,8 @@ private static HistoryEvent getNextEvent(CodedInputStream inputStream) } catch (EOFException eof) { return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Parsed event from input stream" - + ", eventType=" + eventType - + ", event=" + event.toString()); - } + LOG.debug("Parsed event from input stream, eventType={}, event={}", + eventType, event); return event; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 55b2d1b021..3a99456ed7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -220,10 +220,7 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException { ContainerId containerId = ConverterUtils.toContainerId(request .getContainerIdentifier()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from container" - + ", request=" + request); - } + LOG.debug("Received heartbeat from container, request={}", request); if (!registeredContainers.containsKey(containerId)) { LOG.warn("Received task heartbeat from unknown container with id: " + containerId + @@ -488,9 +485,7 @@ public void dagSubmitted() { @Override public void registerRunningContainer(ContainerId containerId, int taskCommId) { - if (LOG.isDebugEnabled()) { - LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener"); - } + LOG.debug("ContainerId: {} registered with TaskAttemptListener", containerId); ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO); if (oldInfo != null) { throw new TezUncheckedException( @@ -515,9 +510,7 @@ public void registerRunningContainer(ContainerId containerId, int taskCommId) { @Override public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) { - if (LOG.isDebugEnabled()) { - LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); - } + LOG.debug("Unregistering Container from TaskAttemptListener: {}", containerId); ContainerInfo containerInfo = registeredContainers.remove(containerId); if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java index 15d90d3832..b5749591c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java @@ -39,8 +39,6 @@ protected void startRpcServer() { } catch (UnknownHostException e) { throw new TezUncheckedException(e); } - if (LOG.isDebugEnabled()) { - LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); - } + LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 9c8fb6c67e..6d69d36014 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -310,18 +310,14 @@ public ContainerTask getTask(ContainerContext containerContext) throws IOExcepti } else { ContainerId containerId = ConverterUtils.toContainerId(containerContext .getContainerIdentifier()); - if (LOG.isDebugEnabled()) { - LOG.debug("Container with id: " + containerId + " asked for a task"); - } + LOG.debug("Container with id: {} asked for a task", containerId); task = getContainerTask(containerId); if (task != null && !task.shouldDie()) { getContext().taskSubmitted(task.getTaskSpec().getTaskAttemptID(), containerId); getContext().taskStartedRemotely(task.getTaskSpec().getTaskAttemptID()); } } - if (LOG.isDebugEnabled()) { - LOG.debug("getTask returning task: " + task); - } + LOG.debug("getTask returning task: {}", task); return task; } @@ -335,10 +331,7 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce TezException { ContainerId containerId = ConverterUtils.toContainerId(request.getContainerIdentifier()); long requestId = request.getRequestId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from container" - + ", request=" + request); - } + LOG.debug("Received heartbeat from container, request={}", request); ContainerInfo containerInfo = registeredContainers.get(containerId); if (containerInfo == null) { @@ -436,9 +429,7 @@ private ContainerTask getContainerTask(ContainerId containerId) throws IOExcepti } } else { task = null; - if (LOG.isDebugEnabled()) { - LOG.debug("No task assigned yet for running container: " + containerId); - } + LOG.debug("No task assigned yet for running container: {}", containerId); } } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index bb5c1aa966..09e9e71b92 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1651,10 +1651,7 @@ DAGState initializeDAG() { if (!groupInfo.outputs.isEmpty()) { // shared outputs for (String vertexName : groupInfo.groupMembers) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting shared outputs for group: " + groupName + - " on vertex: " + vertexName); - } + LOG.debug("Setting shared outputs for group: {} on vertex: {}", groupName, vertexName); Vertex v = getVertex(vertexName); v.addSharedOutputs(groupInfo.outputs); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 9a5e73de5a..8b16b2e8b7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1267,10 +1267,8 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData.getTaskAttemptFinishedEvent(); if (taFinishedEvent == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Only TaskAttemptStartedEvent but no TaskAttemptFinishedEvent, " - + "send out TaskAttemptEventAttemptKilled to move it to KILLED"); - } + LOG.debug("Only TaskAttemptStartedEvent but no TaskAttemptFinishedEvent, " + + "send out TaskAttemptEventAttemptKilled to move it to KILLED"); ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(), "Task Attempt killed in recovery due to can't recover the running task attempt", TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true)); @@ -1285,30 +1283,21 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent + "taskAttemptId=" + ta.getID()); switch (taFinishedEvent.getState()) { case FAILED: - if (LOG.isDebugEnabled()) { - LOG.debug("TaskAttemptFinishedEvent is seen with state of FAILED" - + ", send TA_FAILED to itself" - + ", attemptId=" + ta.attemptId); - } + LOG.debug("TaskAttemptFinishedEvent is seen with state of FAILED, " + + "send TA_FAILED to itself, attemptId={}", ta.attemptId); ta.sendEvent(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, taFinishedEvent.getTaskFailureType(), taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true)); break; case KILLED: - if (LOG.isDebugEnabled()) { - LOG.debug("TaskAttemptFinishedEvent is seen with state of KILLED" - + ", send TA_KILLED to itself" - + ", attemptId=" + ta.attemptId); - } + LOG.debug("TaskAttemptFinishedEvent is seen with state of KILLED, " + + "send TA_KILLED to itself, attemptId={}", ta.attemptId); ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(), taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true)); break; case SUCCEEDED: - if (LOG.isDebugEnabled()) { - LOG.debug("TaskAttemptFinishedEvent is seen with state of SUCCEEDED" - + ", send TA_DONE to itself" - + ", attemptId=" + ta.attemptId); - } + LOG.debug("TaskAttemptFinishedEvent is seen with state of SUCCEEDED, " + + "send TA_DONE to itself, attemptId={}", ta.attemptId); ta.sendEvent(new TaskAttemptEvent(ta.getID(), TaskAttemptEventType.TA_DONE)); break; default: @@ -1671,9 +1660,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) { TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData .getTaskAttemptFinishedEvent(); - if (LOG.isDebugEnabled()) { - LOG.debug("TaskAttempt is recovered to SUCCEEDED, attemptId=" + ta.attemptId); - } + LOG.debug("TaskAttempt is recovered to SUCCEEDED, attemptId={}", ta.attemptId); ta.reportedStatus.counters = taFinishedEvent.getCounters(); List tezEvents = taFinishedEvent.getTAGeneratedEvents(); if (tezEvents != null && !tezEvents.isEmpty()) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 0b4b1160f2..cb8545f8ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -690,9 +690,7 @@ private TaskAttempt selectBestAttempt() { public boolean canCommit(TezTaskAttemptID taskAttemptID) { writeLock.lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Commit go/no-go request from " + taskAttemptID); - } + LOG.debug("Commit go/no-go request from {}", taskAttemptID); TaskState state = getState(); if (state == TaskState.SCHEDULED) { // the actual running task ran and is done and asking for commit. we are still stuck @@ -730,9 +728,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptID) { } } else { if (commitAttempt.equals(taskAttemptID)) { - if (LOG.isDebugEnabled()) { - LOG.debug(taskAttemptID + " already given a go for committing the task output."); - } + LOG.debug("{} already given a go for committing the task output.", taskAttemptID); return true; } // Don't think this can be a pluggable decision, so simply raise an @@ -740,9 +736,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptID) { // Wait for commit attempt to succeed. Dont kill this. If commit // attempt fails then choose a different committer. When commit attempt // succeeds then this and others will be killed - if (LOG.isDebugEnabled()) { - LOG.debug(commitAttempt + " is current committer. Commit waiting for: " + taskAttemptID); - } + LOG.debug("{} is current committer. Commit waiting for: {}", commitAttempt, taskAttemptID); return false; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 6ae3ba55a8..e21add0e3b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -880,10 +880,8 @@ void resetCompletedTaskStatsCache(boolean recompute) { @Override public void initServices() { if (servicesInited.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping Initing services for vertex because already" - + " Initialized, name=" + this.vertexName); - } + LOG.debug("Skipping Initing services for vertex because already" + + " Initialized, name={}", this.vertexName); return; } writeLock.lock(); @@ -891,10 +889,7 @@ public void initServices() { List servicesToAdd = new ArrayList<>(); if (isSpeculationEnabled()) { // Initialize the speculator - if (LOG.isDebugEnabled()) { - LOG.debug( - "Initing service vertex speculator, name=" + this.vertexName); - } + LOG.debug("Initing service vertex speculator, name={}", this.vertexName); speculator = new LegacySpeculator(vertexConf, getAppContext(), this); speculator.init(vertexConf); servicesToAdd.add(speculator); @@ -904,9 +899,7 @@ public void initServices() { } finally { writeLock.unlock(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Initing service vertex, name=" + this.vertexName); - } + LOG.debug("Initing service vertex, name={}", this.vertexName); } @Override @@ -936,9 +929,7 @@ public void stopServices() { try { if (servicesInited.get()) { for (AbstractService srvc : services) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping service : " + srvc); - } + LOG.debug("Stopping service : {}", srvc); Exception ex = ServiceOperations.stopQuietly(srvc); if (ex != null && firstException == null) { LOG.warn(String.format( @@ -1591,7 +1582,7 @@ private void computeProgress() { if (LOG.isDebugEnabled()) { if (!ProgressHelper.isProgressWithinRange(taskProg)) { LOG.debug("progress update: vertex={}, task={} incorrect; range={}", - getName(), task.getTaskId().toString(), taskProg); + getName(), task.getTaskId(), taskProg); } } accProg += ProgressHelper.processProgress(taskProg); @@ -2523,11 +2514,8 @@ private void initializeCommitters() throws Exception { final RootInputLeafOutput od = entry.getValue(); if (od.getControllerDescriptor() == null || od.getControllerDescriptor().getClassName() == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring committer as none specified for output=" - + outputName - + ", vertexId=" + logIdentifier); - } + LOG.debug("Ignoring committer as none specified for output={}, vertexId={}", + outputName, logIdentifier); continue; } LOG.info("Instantiating committer for output=" + outputName @@ -2548,19 +2536,13 @@ public Void run() throws Exception { .createClazzInstance(od.getControllerDescriptor().getClassName(), new Class[]{OutputCommitterContext.class}, new Object[]{outputCommitterContext}); - if (LOG.isDebugEnabled()) { - LOG.debug("Invoking committer init for output=" + outputName - + ", vertex=" + logIdentifier); - } + LOG.debug("Invoking committer init for output={}, vertex={}", outputName, logIdentifier); try { TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId); outputCommitter.initialize(); outputCommitters.put(outputName, outputCommitter); - if (LOG.isDebugEnabled()) { - LOG.debug("Invoking committer setup for output=" + outputName - + ", vertex=" + logIdentifier); - } + LOG.debug("Invoking committer setup for output={}, vertex={}", outputName, logIdentifier); outputCommitter.setupOutput(); } finally { appContext.getHadoopShim().clearHadoopCallerContext(); @@ -4741,9 +4723,7 @@ public NoOpVertexManager(VertexManagerPluginContext context) { @Override public void initialize() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("initialize NoOpVertexManager"); - } + LOG.debug("initialize NoOpVertexManager"); configurationDoneEvent = new VertexConfigurationDoneEvent(); configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray())); String vertexName = getContext().getVertexName(); @@ -4769,9 +4749,7 @@ public void onVertexStarted(List completions) } getContext().doneReconfiguringVertex(); int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Schedule all the tasks, numTask=" + numTasks); - } + LOG.debug("Schedule all the tasks, numTask={}", numTasks); List tasks = new ArrayList(); for (int i=0;i statuses) { // being released // completion of a container we had released earlier // an allocated container completed. notify app - if (LOG.isDebugEnabled()) { - LOG.debug("Released container completed:" + completedId + - " last allocated to task: " + task); - } + LOG.debug("Released container completed:{} last allocated to task: {}", + completedId, task); appContainerStatus.put(task, containerStatus); continue; } @@ -1216,12 +1214,9 @@ boolean preemptIfNeeded() { if(!preemptionWaitDeadlineCrossed && fitsIn(highestPriRequest.getCapability(), freeResources)) { - if (LOG.isDebugEnabled()) { - LOG.debug(highestPriRequest + " fits in free resources"); - } else { - if (numHeartbeats % 50 == 1) { - LOG.info(highestPriRequest + " fits in free resources"); - } + LOG.debug("{} fits in free resources", highestPriRequest); + if (numHeartbeats % 50 == 1) { + LOG.info(highestPriRequest + " fits in free resources"); } return true; } @@ -1509,10 +1504,8 @@ private CookieContainerRequest getMatchingRequestWithoutPriority( if (container.getId().equals( cookieContainerRequest.getAffinitizedContainer())) { // container level match - if (LOG.isDebugEnabled()) { - LOG.debug("Matching with affinity for request: " - + cookieContainerRequest + " container: " + affCId); - } + LOG.debug("Matching with affinity for request: {} container: {}", + cookieContainerRequest, affCId); return cookieContainerRequest; } if (LOG.isDebugEnabled()) { @@ -2025,10 +2018,7 @@ private void mainLoop() { if (delayedContainer == null) { continue; } - if (LOG.isDebugEnabled()) { - LOG.debug("Considering HeldContainer: " - + delayedContainer + " for assignment"); - } + LOG.debug("Considering HeldContainer: {} for assignment", delayedContainer); long currentTs = System.currentTimeMillis(); long nextScheduleTs = delayedContainer.getNextScheduleTime(); if (currentTs >= nextScheduleTs) { @@ -2091,10 +2081,7 @@ private void doAssignAll() { // honor reuse-locality flags (container not timed out yet), Don't queue // (already in queue), don't release (release happens when containers // time-out) - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to assign all delayed containers to newly received" - + " tasks"); - } + LOG.debug("Trying to assign all delayed containers to newly received tasks"); Iterator iter = delayedContainers.iterator(); while(iter.hasNext()) { HeldContainer delayedContainer = iter.next(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index ee322655af..19cf5b7e21 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -126,9 +126,7 @@ private static ContainerLaunchContext createCommonContainerLaunchContext( containerTokens_dob.getLength()); // Add shuffle token - if (LOG.isDebugEnabled()) { - LOG.debug("Putting shuffle token in serviceData in common CLC"); - } + LOG.debug("Putting shuffle token in serviceData in common CLC"); serviceData.put(auxiliaryService, TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials))); } catch (IOException e) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 2115dac247..08d754d8a0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -144,9 +144,7 @@ public void setCorsHeaders() { URL url = new URL(historyUrlBase); origin = url.getProtocol() + "://" + url.getAuthority(); } catch (MalformedURLException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Invalid url set for tez history url base: " + historyUrlBase, e); - } + LOG.debug("Invalid url set for tez history url base: {}", historyUrlBase, e); } } @@ -161,9 +159,7 @@ public void setCorsHeaders() { } void sendErrorResponse(int sc, String msg, Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug(msg, e); - } + LOG.debug(msg, e); try { response().sendError(sc, msg); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java index b1560a5ead..1670370187 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -119,9 +119,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { if (this.webApp != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping WebApp"); - } + LOG.debug("Stopping WebApp"); this.webApp.stop(); } super.serviceStop(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index a0a152c81c..45e7d2fc9c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -405,11 +405,8 @@ private void createFatalErrorFlagDir() throws IOException { protected void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Handling summary event" - + ", dagID=" + dagID - + ", eventType=" + eventType); - } + LOG.debug("Handling summary event, dagID={}, eventType={}", dagID, eventType); + if (summaryStream == null) { Path summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryPath); if (LOG.isDebugEnabled()) { @@ -470,11 +467,8 @@ protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException { } - if (LOG.isDebugEnabled()) { - LOG.debug("Writing recovery event to output stream" - + ", dagId=" + dagID - + ", eventType=" + eventType); - } + LOG.debug("Writing recovery event to output stream, dagId={}, eventType={}", + dagID, eventType); ++unflushedEventsCount; recoveryStream.codedOutputStream.writeFixed32NoTag(event.getHistoryEvent().getEventType().ordinal()); event.getHistoryEvent().toProtoStream(recoveryStream.codedOutputStream); @@ -489,11 +483,9 @@ private void maybeFlush(RecoveryStream recoveryStream) throws IOException { boolean doFlush = false; if (maxUnflushedEvents >=0 && unflushedEventsCount >= maxUnflushedEvents) { - if (LOG.isDebugEnabled()) { - LOG.debug("Max unflushed events count reached. Flushing recovery data" - + ", unflushedEventsCount=" + unflushedEventsCount - + ", maxUnflushedEvents=" + maxUnflushedEvents); - } + LOG.debug("Max unflushed events count reached. Flushing recovery data, " + + "unflushedEventsCount={}, maxUnflushedEvents={}", unflushedEventsCount, + maxUnflushedEvents); doFlush = true; } else if (flushInterval >= 0 && ((currentTime - lastFlushTime) >= (flushInterval*1000))) { diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index bce16eee56..61e1f6c431 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -68,25 +68,19 @@ public void setInputFormat(InputFormat wrappedInputFormat) { public void setSplitSizeEstimator(SplitSizeEstimator estimator) { Preconditions.checkArgument(estimator != null); this.estimator = estimator; - if (LOG.isDebugEnabled()) { - LOG.debug("Split size estimator : " + estimator); - } + LOG.debug("Split size estimator : {}", estimator); } public void setSplitLocationProvider(SplitLocationProvider locationProvider) { Preconditions.checkArgument(locationProvider != null); this.locationProvider = locationProvider; - if (LOG.isDebugEnabled()) { - LOG.debug("Split size location provider: " + locationProvider); - } + LOG.debug("Split size location provider: {}", locationProvider); } public void setDesiredNumberOfSplits(int num) { Preconditions.checkArgument(num >= 0); this.desiredNumSplits = num; - if (LOG.isDebugEnabled()) { - LOG.debug("desiredNumSplits: " + desiredNumSplits); - } + LOG.debug("desiredNumSplits: {}", desiredNumSplits); } @Override diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java index 226425cd57..863f9aa792 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java @@ -70,25 +70,19 @@ public void setInputFormat(InputFormat wrappedInputFormat) { public void setDesiredNumberOfSplits(int num) { Preconditions.checkArgument(num >= 0); this.desiredNumSplits = num; - if (LOG.isDebugEnabled()) { - LOG.debug("desiredNumSplits: " + desiredNumSplits); - } + LOG.debug("desiredNumSplits: {}", desiredNumSplits); } public void setSplitSizeEstimator(SplitSizeEstimator estimator) { Preconditions.checkArgument(estimator != null); this.estimator = estimator; - if (LOG.isDebugEnabled()) { - LOG.debug("Split size estimator : " + estimator); - } + LOG.debug("Split size estimator : {}", estimator); } public void setSplitLocationProvider(SplitLocationProvider locationProvider) { Preconditions.checkArgument(locationProvider != null); this.locationProvider = locationProvider; - if (LOG.isDebugEnabled()) { - LOG.debug("Split location provider : " + locationProvider); - } + LOG.debug("Split location provider : {}", locationProvider); } @Override diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 5a7d754ba5..9dba357951 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -616,9 +616,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) dagAMConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, javaOpts.toString()); if (envStr.length() > 0) { dagAMConf.set(TezConfiguration.TEZ_AM_LAUNCH_ENV, envStr); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting MR AM env to : " + envStr); - } + LOG.debug("Setting MR AM env to : {}", envStr); } // Submit to ResourceManager diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 19d64a5abb..a8e85a34e4 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -322,18 +322,14 @@ public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf, InputSplitInfoMem splitInfoMem = null; JobConf jobConf = new JobConf(conf); if (jobConf.getUseNewMapper()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Generating mapreduce api input splits"); - } + LOG.debug("Generating mapreduce api input splits"); Job job = Job.getInstance(conf); org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(job, groupSplits, sortSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, job.getCredentials(), job.getConfiguration()); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Generating mapred api input splits"); - } + LOG.debug("Generating mapred api input splits"); org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf, groupSplits, sortSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 950e629907..19ece5a0f6 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -488,9 +488,7 @@ public void initCommitter(JobConf job, boolean useNewApi) throws IOException, InterruptedException { if (useNewApi) { - if (LOG.isDebugEnabled()) { - LOG.debug("using new api for output committer"); - } + LOG.debug("using new api for output committer"); this.committer = newOutputFormat.getOutputCommitter( newApiTaskAttemptContext); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index b79f19cfa0..1a13168cb7 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -509,10 +509,7 @@ public void close() throws IOException { comparator, keyClass, valueClass); - if (LOG.isDebugEnabled()) { - LOG.debug("Using key class: " + keyClass - + ", valueClass: " + valueClass); - } + LOG.debug("Using key class: {}, valueClass: {}", keyClass, valueClass); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java index 625f7ab8ba..54db975292 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java @@ -84,9 +84,7 @@ public TezSpillRecord getSpillRecord(String mapId, Path fileName, String expecte } } } - if (LOG.isDebugEnabled()) { - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); - } + LOG.debug("IndexCache HIT: MapId {} found", mapId); } if (info.mapSpillRecord.size() == 0) { @@ -125,9 +123,7 @@ public TezIndexRecord getIndexInformation(String mapId, int reduce, } } } - if (LOG.isDebugEnabled()) { - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); - } + LOG.debug("IndexCache HIT: MapId {} found", mapId); } if (info.mapSpillRecord.size() == 0 || @@ -161,14 +157,11 @@ private IndexInformation readIndexFileToCache(Path indexFileName, } } } - if (LOG.isDebugEnabled()) { - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); - } + LOG.debug("IndexCache HIT: MapId {} found", mapId); return info; } - if (LOG.isDebugEnabled()) { - LOG.debug("IndexCache MISS: MapId " + mapId + " not found"); - } + LOG.debug("IndexCache MISS: MapId {} not found", mapId); + TezSpillRecord tmp = null; try { tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner); diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index b67883dfcf..55389ea78e 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -933,9 +933,7 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Path mapOutputFileName = lDirAlloc.getLocalPathToRead( attemptBase + Path.SEPARATOR + DATA_FILE_NAME, conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded : " + key + " via loader"); - } + LOG.debug("Loaded : {} via loader", key); return new AttemptPathInfo(indexFileName, mapOutputFileName); } }); @@ -1011,10 +1009,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) boolean keepAliveParam = false; if (keepAliveList != null && keepAliveList.size() == 1) { keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0)); - if (LOG.isDebugEnabled()) { - LOG.debug("KeepAliveParam : " + keepAliveList - + " : " + keepAliveParam); - } + LOG.debug("KeepAliveParam : {} : {}", keepAliveList, keepAliveParam); } final List mapIds = splitMaps(q.get("map")); final Range reduceRange = splitReduces(q.get("reduce")); @@ -1226,11 +1221,8 @@ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, AttemptPathIdentifier identifier = new AttemptPathIdentifier( jobId, dagId, user, mapId); pathInfo = pathCache.get(identifier); - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved pathInfo for " + identifier + - " check for corresponding loaded messages to determine whether" + - " it was loaded or cached"); - } + LOG.debug("Retrieved pathInfo for {} check for corresponding loaded " + + "messages to determine whether it was loaded or cached", identifier); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); @@ -1303,13 +1295,9 @@ protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); - if (LOG.isDebugEnabled()) { - LOG.debug("Content Length in shuffle : " + contentLength); - } + LOG.debug("Content Length in shuffle : {}", contentLength); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting connection close header..."); - } + LOG.debug("Setting connection close header..."); response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index f8a3de2242..583cc0099a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -467,9 +467,7 @@ protected Void callInternal() throws Exception { } protected Void _callInternal() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing Input using InputSpec: " + inputSpec); - } + LOG.debug("Initializing Input using InputSpec: {}", inputSpec); String edgeName = inputSpec.getSourceVertexName(); InputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex); LogicalInput input = createInput(inputSpec, inputContext); @@ -483,9 +481,7 @@ protected Void _callInternal() throws Exception { inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); initializedInputs.put(edgeName, input); - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized Input with src edge: " + edgeName); - } + LOG.debug("Initialized Input with src edge: {}", edgeName); initializedInputs.put(edgeName, input); return null; } @@ -512,9 +508,7 @@ protected Void callInternal() throws Exception { } protected Void _callInternal() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Starting Input with src edge: " + srcVertexName); - } + LOG.debug("Starting Input with src edge: {}", srcVertexName); input.start(); LOG.info("Started Input with src edge: " + srcVertexName); @@ -544,9 +538,7 @@ protected Void callInternal() throws Exception { } protected Void _callInternal() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing Output using OutputSpec: " + outputSpec); - } + LOG.debug("Initializing Output using OutputSpec: {}", outputSpec); String edgeName = outputSpec.getDestinationVertexName(); OutputContext outputContext = createOutputContext(outputSpec, outputIndex); LogicalOutput output = createOutput(outputSpec, outputContext); @@ -559,9 +551,7 @@ protected Void _callInternal() throws Exception { outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); initializedOutputs.put(edgeName, output); - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized Output with dest edge: " + edgeName); - } + LOG.debug("Initialized Output with dest edge: {}", edgeName); initializedOutputs.put(edgeName, output); return null; } @@ -579,9 +569,7 @@ private void initializeGroupInputs() throws TezException { if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) { groupInputsMap = new ConcurrentHashMap(groupInputSpecs.size()); for (GroupInputSpec groupInputSpec : groupInputSpecs) { - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec); - } + LOG.debug("Initializing GroupInput using GroupInputSpec: {}", groupInputSpec); MergedInputContext mergedInputContext = new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs, this); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index 9ff284d5bf..f28573a649 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -187,8 +187,6 @@ public void close() throws IOException { super.close(); this.userPayload = null; this.inputReadyTracker = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Cleared TezInputContextImpl related information"); - } + LOG.debug("Cleared TezInputContextImpl related information"); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index db3212280f..ec8280a239 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -167,8 +167,6 @@ public OutputStatisticsReporter getStatisticsReporter() { public void close() throws IOException { super.close(); this.userPayload = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Cleared TezOutputContextImpl related information"); - } + LOG.debug("Cleared TezOutputContextImpl related information"); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 71ed077c50..e09aa8377e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -143,9 +143,7 @@ public void close() throws IOException { super.close(); this.userPayload = null; this.inputReadyTracker = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Cleared TezProcessorContextImpl related information"); - } + LOG.debug("Cleared TezProcessorContextImpl related information"); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/ScalingAllocator.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/ScalingAllocator.java index 872632e1ea..e045abd9d2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/ScalingAllocator.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/ScalingAllocator.java @@ -80,16 +80,11 @@ public Long apply(InitialMemoryRequestContext requestContext) { long requestedSize = request.getRequestedSize(); if (requestedSize == 0) { allocations.add(0l); - if (LOG.isDebugEnabled()) { - LOG.debug("Scaling requested: 0 to allocated: 0"); - } + LOG.debug("Scaling requested: 0 to allocated: 0"); } else { long allocated = (long) ((requestedSize / (double) totalRequested) * availableForAllocation); allocations.add(allocated); - if (LOG.isDebugEnabled()) { - LOG.debug("Scaling requested: " + requestedSize + " to allocated: " + allocated); - } - + LOG.debug("Scaling requested: {} to allocated: {}", requestedSize, allocated); } } return allocations; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index fb066fd2bd..978942d4e7 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -264,16 +264,12 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); - if (LOG.isDebugEnabled()) { - LOG.debug("Sending heartbeat to AM, request=" + request); - } + LOG.debug("Sending heartbeat to AM, request={}", request); maybeLogCounters(); TezHeartbeatResponse response = umbilical.heartbeat(request); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat response from AM, response=" + response); - } + LOG.debug("Received heartbeat response from AM, response={}", response); if (response.shouldDie()) { LOG.info("Received should die response from AM"); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 86ceb12d7d..c82355a9fa 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -337,9 +337,7 @@ private void handleNewTaskLocalResources(ContainerTask containerTask, UserGroupInformation ugi) throws IOException, TezException { final Map additionalResources = containerTask.getAdditionalResources(); - if (LOG.isDebugEnabled()) { - LOG.debug("Additional Resources added to container: " + additionalResources); - } + LOG.debug("Additional Resources added to container: {}", additionalResources); if (additionalResources != null && !additionalResources.isEmpty()) { LOG.info("Localizing additional local resources for Task : " + additionalResources); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java index 3b45cdd709..e12331c250 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java @@ -77,9 +77,7 @@ public HttpConnection(URL url, HttpConnectionParams connParams, this.url = url; this.stopWatch = new StopWatch(); this.urlLogCount = new AtomicLong(); - if (LOG.isDebugEnabled()) { - LOG.debug("MapOutput URL :" + url.toString()); - } + LOG.debug("MapOutput URL :{}", url); } @VisibleForTesting @@ -278,9 +276,7 @@ public void cleanup(boolean disconnect) throws IOException { stopWatch.reset().start(); try { if (input != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing input on " + logIdentifier); - } + LOG.debug("Closing input on {}", logIdentifier); input.close(); input = null; } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java index 43f64b82b3..63b8934821 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java @@ -113,9 +113,7 @@ public AsyncHttpConnection(URL url, HttpConnectionParams connParams, this.httpConnParams = connParams; this.url = url; this.stopWatch = new StopWatch(); - if (LOG.isDebugEnabled()) { - LOG.debug("MapOutput URL :" + url.toString()); - } + LOG.debug("MapOutput URL :{}", url); initClient(httpConnParams); pos = new PipedOutputStream(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index e924876628..bcb7bb58ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -183,10 +183,8 @@ private void processDataMovementEvent(DataMovementEvent dme, DataMovementEventPa if (emptyPartitionsBitSet.get(srcIndex)) { CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, false); - if (LOG.isDebugEnabled()) { - LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: [" - + srcAttemptIdentifier + "]. Not fetching."); - } + LOG.debug("Source partition: {} did not generate any data. SrcAttempt: [{}]. Not fetching.", + srcIndex, srcAttemptIdentifier); numDmeEventsNoData.getAndIncrement(); shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0)); return; @@ -261,10 +259,8 @@ private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovement allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId); if (emptyPartitionsBitSet.get(srcPartitionId)) { InputAttemptIdentifier srcAttemptIdentifier = compositeInputAttemptIdentifier.expand(i); - if (LOG.isDebugEnabled()) { - LOG.debug("Source partition: " + srcPartitionId + " did not generate any data. SrcAttempt: [" - + srcAttemptIdentifier + "]. Not fetching."); - } + LOG.debug("Source partition: {} did not generate any data. SrcAttempt: [{}]. Not fetching.", + srcPartitionId, srcAttemptIdentifier); numDmeEventsNoData.getAndIncrement(); shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 901ee08a7a..f8b022cf32 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -441,9 +441,7 @@ protected Void callInternal() throws Exception { break; } - if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: " + numCompletedInputs); - } + LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, numCompletedInputs); if (numCompletedInputs.get() < numInputs && !isShutdown.get()) { lock.lock(); try { @@ -661,9 +659,7 @@ public void addKnownInput(String hostName, int port, public void addCompletedInputWithNoData( InputAttemptIdentifier srcAttemptIdentifier) { int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); - if (LOG.isDebugEnabled()) { - LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); - } + LOG.debug("No input data exists for SrcTask: {}. Marking as complete.", inputIdentifier); lock.lock(); try { if (!completedInputSet.get(inputIdentifier)) { From 250803a391af642a93f05e1e5411e70de2f29698 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Tue, 19 Jan 2021 15:41:23 -0500 Subject: [PATCH 2/2] review batch of classes --- .../library/common/TezRuntimeUtils.java | 4 +- .../library/common/shuffle/ShuffleUtils.java | 8 +--- .../common/shuffle/impl/ShuffleManager.java | 12 ++---- .../orderedgrouped/FetcherOrderedGrouped.java | 38 +++++-------------- .../shuffle/orderedgrouped/MergeManager.java | 8 +--- ...huffleInputEventHandlerOrderedGrouped.java | 17 +++------ .../orderedgrouped/ShuffleScheduler.java | 12 ++---- .../library/common/sort/impl/IFile.java | 4 +- .../task/local/output/TezTaskOutputFiles.java | 6 +-- .../library/input/OrderedGroupedKVInput.java | 4 +- .../WeightedScalingMemoryDistributor.java | 4 +- .../org/apache/tez/tools/TFileLoader.java | 5 +-- 12 files changed, 30 insertions(+), 92 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index daeafbc6fe..9ff3d1c1e5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -80,9 +80,7 @@ public static Combiner instantiateCombiner(Configuration conf, TaskContext taskC if (className == null) { return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Using Combiner class: " + className); - } + LOG.debug("Using Combiner class: {}", className); try { clazz = (Class) conf.getClassByName(className); } catch (ClassNotFoundException e) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 40909d4715..6a61474a5a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -121,10 +121,7 @@ public static void shuffleToMemory(byte[] shuffleData, IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec, ifileReadAhead, ifileReadAheadLength); // metrics.inputBytes(shuffleData.length); - if (LOG.isDebugEnabled()) { - LOG.debug("Read " + shuffleData.length + " bytes from input for " - + identifier); - } + LOG.debug("Read {} bytes from input for {}", shuffleData.length, identifier); } catch (InternalError | Exception e) { // Close the streams LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + @@ -200,8 +197,7 @@ public static void ioCleanup(Closeable... closeables) { try { c.close(); } catch (IOException e) { - if (LOG.isDebugEnabled()) - LOG.debug("Exception in closing " + c, e); + LOG.debug("Exception in closing {}", c, e); } } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index f8b022cf32..56195a8641 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -1140,9 +1140,7 @@ public void onSuccess(Void result) { @Override public void onFailure(Throwable t) { if (isShutdown.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error: " + t); - } + LOG.debug("{}: Already shutdown. Ignoring error.", srcNameTrimmed, t); } else { LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t); inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Scheduler Failed"); @@ -1173,9 +1171,7 @@ private void doBookKeepingForFetcherComplete() { public void onSuccess(FetchResult result) { fetcher.shutdown(); if (isShutdown.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring event from fetcher"); - } + LOG.debug("{}: Already shutdown. Ignoring event from fetcher", srcNameTrimmed); } else { Iterable pendingInputs = result.getPendingInputs(); if (pendingInputs != null && pendingInputs.iterator().hasNext()) { @@ -1198,9 +1194,7 @@ public void onFailure(Throwable t) { // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down. fetcher.shutdown(); if (isShutdown.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error from fetcher: " + t); - } + LOG.debug("{}: Already shutdown. Ignoring error from fetcher.", srcNameTrimmed, t); } else { LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t); shuffleError = t; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 327232710b..c9bd092f05 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -208,9 +208,7 @@ public Void callInternal() { public void shutDown() { if (!stopped) { - if (LOG.isDebugEnabled()) { - LOG.debug("Fetcher stopped for host " + mapHost); - } + LOG.debug("Fetcher stopped for host {}", mapHost); stopped = true; // An interrupt will come in while shutting down the thread. cleanupCurrentConnection(false); @@ -288,19 +286,14 @@ protected void copyFromHost(MapHost host) throws IOException { // Setup connection again if disconnected cleanupCurrentConnection(true); if (stopped) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not re-establishing connection since Fetcher has been stopped"); - } + LOG.debug("Not re-establishing connection since Fetcher has been stopped"); return; } // Connect with retry if (!setupConnection(host, remaining.values())) { if (stopped) { cleanupCurrentConnection(true); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Not reporting connection re-establishment failure since fetcher is stopped"); - } + LOG.debug("Not reporting connection re-establishment failure since fetcher is stopped"); return; } failedTasks = new InputAttemptFetchFailure[] { @@ -354,9 +347,7 @@ boolean setupConnection(MapHost host, Collection attempt connectSucceeded = httpConnection.connect(); if (stopped) { - if (LOG.isDebugEnabled()) { - LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); - } + LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); return false; } input = httpConnection.getInputStream(); @@ -367,9 +358,7 @@ boolean setupConnection(MapHost host, Collection attempt Thread.currentThread().interrupt(); //reset status } if (stopped) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown"); - } + LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown"); return false; } ioErrs.increment(1); @@ -471,9 +460,7 @@ protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream return new InputAttemptFetchFailure[] { InputAttemptFetchFailure.fromAttempt(getNextRemainingAttempt()) }; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already shutdown. Ignoring invalid map id error"); - } + LOG.debug("Already shutdown. Ignoring invalid map id error"); return EMPTY_ATTEMPT_ID_ARRAY; } } @@ -518,9 +505,7 @@ protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream return new InputAttemptFetchFailure[] { new InputAttemptFetchFailure(getNextRemainingAttempt()) }; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring verification failure."); - } + LOG.debug("Already stopped. Ignoring verification failure."); return EMPTY_ATTEMPT_ID_ARRAY; } } @@ -544,9 +529,7 @@ protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream ioErrs.increment(1); scheduler.reportLocalError(e); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring error from merger.reserve"); - } + LOG.debug("Already stopped. Ignoring error from merger.reserve"); } return EMPTY_ATTEMPT_ID_ARRAY; } @@ -760,10 +743,7 @@ protected void setupLocalDiskFetch(MapHost host) throws InterruptedException { LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), e); } else { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Ignoring fetch error during local disk copy since fetcher has already been stopped"); - } + LOG.debug("Ignoring fetch error during local disk copy since fetcher has already been stopped"); return; } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 59ff577fed..46360e1287 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -388,9 +388,7 @@ public void waitForInMemoryMerge() throws InterruptedException { } if (triggerAdditionalMerge) { inMemoryMerger.waitForMerge(); - if (LOG.isDebugEnabled()) { - LOG.debug("Additional in-memory merge triggered"); - } + LOG.debug("Additional in-memory merge triggered"); } } @@ -700,9 +698,7 @@ static void cleanup(FileSystem fs, Path path) { } try { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting " + path); - } + LOG.debug("Deleting {}", path); fs.delete(path, true); } catch (IOException e) { LOG.info("Error in deleting " + path); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 116098fe26..0c55a3a388 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -156,11 +156,8 @@ private void processDataMovementEvent(DataMovementEvent dmEvent, DataMovementEve if (shufflePayload.hasEmptyPartitions()) { try { if (emptyPartitionsBitSet.get(partitionId)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" - + srcAttemptIdentifier + "]. Not fetching."); - } + LOG.debug("Source partition: {} did not generate any data. SrcAttempt: [{}]. Not fetching.", + partitionId, srcAttemptIdentifier); numDmeEventsNoData.getAndIncrement(); scheduler.copySucceeded(srcAttemptIdentifier.expand(0), null, 0, 0, 0, null, true); return; @@ -191,10 +188,8 @@ private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovement allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId); if (emptyPartitionsBitSet.get(srcPartitionId)) { InputAttemptIdentifier srcInputAttemptIdentifier = compositeInputAttemptIdentifier.expand(i); - if (LOG.isDebugEnabled()) { - LOG.debug("Source partition: " + srcPartitionId + " did not generate any data. SrcAttempt: [" - + srcInputAttemptIdentifier + "]. Not fetching."); - } + LOG.debug("Source partition: {} did not generate any data. SrcAttempt: [{}]. Not fetching.", + srcPartitionId, srcInputAttemptIdentifier); numDmeEventsNoData.getAndIncrement(); scheduler.copySucceeded(srcInputAttemptIdentifier, null, 0, 0, 0, null, true); } @@ -212,9 +207,7 @@ private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovement private void processTaskFailedEvent(InputFailedEvent ifEvent) { InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion()); scheduler.obsoleteInput(taIdentifier); - if (LOG.isDebugEnabled()) { - LOG.debug("Obsoleting output of src-task: " + taIdentifier); - } + LOG.debug("Obsoleting output of src-task: {}", taIdentifier); } /** diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 416041e005..67681cedc7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1104,9 +1104,7 @@ public void obsoleteInput(InputAttemptIdentifier srcAttempt) { if (eventInfo.eventsProcessed.isEmpty() && !eventInfo.scheduledForDownload) { // obsoleted anyways; no point tracking if nothing is started pipelinedShuffleInfoEventsMap.remove(srcAttempt.getInputIdentifier()); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing " + eventInfo + " from tracking"); - } + LOG.debug("Removing {} from tracking", eventInfo); return; } IOException exception = new IOException(srcAttempt + " is marked as obsoleteInput, but it " @@ -1128,9 +1126,7 @@ public synchronized void putBackKnownMapOutput(MapHost host, public synchronized MapHost getHost() throws InterruptedException { while (pendingHosts.isEmpty() && remainingMaps.get() > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("PendingHosts=" + pendingHosts); - } + LOG.debug("PendingHosts={}", pendingHosts); waitAndNotifyProgress(); } @@ -1436,9 +1432,7 @@ protected Void callInternal() throws InterruptedException { if (mapHost == null) { break; // Check for the exit condition. } - if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + mapHost.toString()); - } + LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost); if (!isShutdown.get()) { count++; if (LOG.isDebugEnabled()) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 1b2aefff41..a4bbf5aabf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -844,9 +844,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen try { in.close(); } catch(IOException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Exception in closing " + in, e); - } + LOG.debug("Exception in closing {}", in, e); } } throw ioe; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java index 88474f99f4..3fb90865d1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java @@ -65,11 +65,7 @@ public TezTaskOutputFiles(Configuration conf, String uniqueId, int dagID) { * if service_id = tez_shuffle then "${appDir}/dagId/output/${uniqueId}" */ private Path getAttemptOutputDir() { - if (LOG.isDebugEnabled()) { - LOG.debug("getAttemptOutputDir: " - + Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" - + uniqueId); - } + LOG.debug("getAttemptOutputDir: {}/{}", Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId); String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR); return new Path(dagPath, uniqueId); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 2b405bb343..313c13d188 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -130,9 +130,7 @@ public synchronized void start() throws IOException { // Start the shuffle - copy and merge shuffle = createShuffle(); shuffle.run(); - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized the handlers in shuffle..Safe to start processing.."); - } + LOG.debug("Initialized the handlers in shuffle..Safe to start processing.."); List pending = new LinkedList(); pendingEvents.drainTo(pending); if (pending.size() > 0) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java index b82e6d3d8f..52f3d44b4f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java @@ -244,9 +244,7 @@ private RequestType getRequestTypeForClass(String className) { requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT; } else { requestType = RequestType.OTHER; - if (LOG.isDebugEnabled()) { - LOG.debug("Falling back to RequestType.OTHER for class: " + className); - } + LOG.debug("Falling back to RequestType.OTHER for class: {}", className); } return requestType; } diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java index 18e9940d1d..320428b8cd 100644 --- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java +++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java @@ -66,10 +66,7 @@ public Tuple getNext() throws IOException { currentKey = recReader.getCurrentKey(); String line = recReader.getCurrentValue().toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("currentKey: " + currentKey - + ", line=" + line); - } + LOG.debug("currentKey: {}, line={}", currentKey, line); //Tuple would be of format: machine, key, line Tuple tuple = tupleFactory.newTuple(3); if (currentKey != null) {