Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
capability.setVirtualCores(
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
if (LOG.isDebugEnabled()) {
LOG.debug("AppMaster capability = " + capability);
}
LOG.debug("AppMaster capability = {}", capability);

// Setup required Credentials for the AM launch. DAG specific credentials
// are handled separately.
Expand Down Expand Up @@ -531,10 +529,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
}
vargsFinal.add(mergedCommand.toString());

if (LOG.isDebugEnabled()) {
LOG.debug("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
}
LOG.debug("Command to launch container for ApplicationMaster is : {}", mergedCommand);

Map<String, String> environment = new TreeMap<String, String>();
TezYARNUtils.setupDefaultEnv(environment, conf,
Expand Down Expand Up @@ -968,9 +963,7 @@ public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf,
serviceAddr);
userUgi.addToken(token);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to Tez AM at " + serviceAddr);
}
LOG.debug("Connecting to Tez AM at {}", serviceAddr);
DAGClientAMProtocolBlockingPB proxy = null;
try {
proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
Expand Down
11 changes: 4 additions & 7 deletions tez-api/src/main/java/org/apache/tez/common/JavaOptsChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ public class JavaOptsChecker {

public void checkOpts(String opts) throws TezException {
Set<String> gcOpts = new TreeSet<String>();
if (LOG.isDebugEnabled()) {
LOG.debug("Checking JVM GC opts: " + opts);
}
LOG.debug("Checking JVM GC opts: {}", opts);

Matcher matcher = pattern.matcher(opts);
while (matcher.find()) {
if (matcher.groupCount() != 3) {
Expand Down Expand Up @@ -74,10 +73,8 @@ public void checkOpts(String opts) throws TezException {
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Found clashing GC opts"
+ ", conflicting GC Values=" + gcOpts);
}
LOG.debug("Found clashing GC opts, conflicting GC Values={}", gcOpts);

throw new TezException("Invalid/conflicting GC options found,"
+ " cmdOpts=\"" + opts + "\"");
}
Expand Down
12 changes: 4 additions & 8 deletions tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ public void run() {
// Report progress as 0.0f when if are errors.
processorContext.setProgress(progressVal);
} catch (Throwable th) {
if (LOG.isDebugEnabled()) {
LOG.debug("progress update: Encountered InterruptedException during"
+ " Processor={}", processorName, th);
}
LOG.debug("progress update: Encountered InterruptedException during"
+ " Processor={}", processorName, th);
if (th instanceof InterruptedException) {
// set interrupt flag to true sand exit
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -161,10 +159,8 @@ public void shutDownProgressTaskService() {
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted exception while shutting down the "
+ "executor service for the processor name={}", processorName);
}
LOG.debug("Interrupted exception while shutting down the "
+ "executor service for the processor name={}", processorName);
}
scheduledExecutorService.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,9 +1899,7 @@ public TezConfiguration(boolean loadDefaults) {
public static void validateProperty(String property, Scope usedScope) {
Scope validScope = PropertyScope.get(property);
if (validScope == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(property + " is not standard configuration property of tez, can not been validated");
}
LOG.debug("{} is not standard configuration property of tez, can not been validated", property);
} else {
if (usedScope.ordinal() > validScope.ordinal()) {
throw new IllegalStateException(property + " is set at the scope of " + usedScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,7 @@ private DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOption
LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline"
+ " - Application not found by YARN", e);
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.info("DAGStatus fetch failed." + e.getMessage());
}
LOG.debug("DAGStatus fetch failed", e);
}
}

Expand Down Expand Up @@ -302,9 +300,7 @@ public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> status
+ " - Application not found by YARN", e);
return null;
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage());
}
LOG.debug("ERROR fetching vertex data from Yarn Timeline", e);
}
}

Expand Down Expand Up @@ -425,9 +421,7 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts>
*/
@VisibleForTesting
protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
}
LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId);
ApplicationReport appReport;
try {
appReport = frameworkClient.getApplicationReport(appId);
Expand Down Expand Up @@ -638,9 +632,7 @@ private void switchToTimelineClient() throws IOException, TezException {
realClient.close();
realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient,
(int) (2 * PRINT_STATUS_INTERVAL_MILLIS));
if (LOG.isDebugEnabled()) {
LOG.debug("dag completed switching to DAGClientTimelineImpl");
}
LOG.debug("dag completed switching to DAGClientTimelineImpl");
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,8 @@ private static ConnectionConfigurator getNewConnectionConf(final Configuration c
try {
connectionConf = getNewSSLConnectionConf(conf, connTimeout, sslFactory);
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot load customized ssl related configuration."
+ " Falling back to system-generic settings.", e);
}
LOG.debug("Cannot load customized ssl related configuration."
+ " Falling back to system-generic settings.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ public String getSessionIdentifierString() {

@Override
public void tryKillDAG() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
}
LOG.debug("TryKill for app: {} dag:{}", appId, dagId);
try {
if (createAMProxyIfNeeded()) {
TryKillDAGRequestProto requestProto =
Expand Down Expand Up @@ -186,9 +184,7 @@ void resetProxy(Exception e) {

DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
throws IOException, TezException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
}
LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId);
GetDAGStatusRequestProto.Builder requestProtoBuilder =
GetDAGStatusRequestProto.newBuilder()
.setDagId(dagId).setTimeout(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ DAG getDAG(String dagIdStr) throws TezException {
final String currentDAGIdStr = currentDAG.getID().toString();
if (!currentDAGIdStr.equals(dagIdStr)) {
if (getAllDagIDs().contains(dagIdStr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr);
}
LOG.debug("Looking for finished dagId {} current dag is {}", dagIdStr, currentDAGIdStr);
throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " +
currentDAGIdStr);
} else {
Expand Down
36 changes: 11 additions & 25 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,7 @@ public synchronized void serviceInit(final Configuration conf) throws Exception
this.webUIService = new WebUIService(context);
addIfService(webUIService, false);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Web UI Service is not enabled.");
}
LOG.debug("Web UI Service is not enabled.");
}

this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors);
Expand Down Expand Up @@ -1335,9 +1333,8 @@ public String submitDAGToAppMaster(DAGPlan dagPlan,
// the job user's UGI context
LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());

if (LOG.isDebugEnabled()) {
LOG.debug("Invoked with additional local resources: " + additionalResources);
}
LOG.debug("Invoked with additional local resources: {}", additionalResources);

if (!dagPlan.getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
submittedDAGs.incrementAndGet();
}
Expand Down Expand Up @@ -1863,9 +1860,8 @@ void startServices() {
try {
Throwable firstError = null;
List<ServiceThread> threads = new ArrayList<ServiceThread>();
if(LOG.isDebugEnabled()) {
LOG.debug("Begin parallel start");
}
LOG.debug("Begin parallel start");

for(ServiceWithDependency sd : services.values()) {
// start the service. If this fails that service
// will be stopped and an exception raised
Expand All @@ -1889,19 +1885,15 @@ void startServices() {
if(firstError != null) {
throw ServiceStateException.convert(firstError);
}
if(LOG.isDebugEnabled()) {
LOG.debug("End parallel start");
}
LOG.debug("End parallel start");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

void initServices(Configuration conf) {
for (ServiceWithDependency sd : services.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initing service : " + sd.service);
}
LOG.debug("Initing service : {}", sd.service);
sd.service.init(conf);
}
}
Expand All @@ -1919,9 +1911,7 @@ void stopServices() {

for (int i = services.size() - 1; i >= 0; i--) {
Service service = serviceList.get(i);
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping service : " + service);
}
LOG.debug("Stopping service : {}", service);
Exception ex = ServiceOperations.stopQuietly(service);
if (ex != null && firstException == null) {
LOG.warn("Failed to stop service, name=" + service.getName(), ex);
Expand Down Expand Up @@ -2163,10 +2153,8 @@ public void serviceStop() throws Exception {
boolean deleteTezScratchData = this.amConf.getBoolean(
TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+ deleteTezScratchData);
}
LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData={}",
deleteTezScratchData);
if (deleteTezScratchData && this.taskSchedulerManager != null
&& this.taskSchedulerManager.hasUnregistered()) {
// Delete tez scratch data dir
Expand Down Expand Up @@ -2443,9 +2431,7 @@ static class DAGAppMasterShutdownHook implements Runnable {
public void run() {
LOG.info("DAGAppMasterShutdownHook invoked");
if(appMaster.getServiceState() == STATE.STOPPED) {
if(LOG.isDebugEnabled()) {
LOG.debug("DAGAppMaster already stopped. Ignoring signal");
}
LOG.debug("DAGAppMaster already stopped. Ignoring signal");
synchronized (appMaster.shutdownHandlerRunning) {
try {
if (appMaster.shutdownHandlerRunning.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,8 @@ private static HistoryEvent getNextEvent(CodedInputStream inputStream)
} catch (EOFException eof) {
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Parsed event from input stream"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
}
LOG.debug("Parsed event from input stream, eventType={}, event={}",
eventType, event);
return event;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,7 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
.getContainerIdentifier());
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from container"
+ ", request=" + request);
}
LOG.debug("Received heartbeat from container, request={}", request);

if (!registeredContainers.containsKey(containerId)) {
LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
Expand Down Expand Up @@ -488,9 +485,7 @@ public void dagSubmitted() {

@Override
public void registerRunningContainer(ContainerId containerId, int taskCommId) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
}
LOG.debug("ContainerId: {} registered with TaskAttemptListener", containerId);
ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
if (oldInfo != null) {
throw new TezUncheckedException(
Expand All @@ -515,9 +510,7 @@ public void registerRunningContainer(ContainerId containerId, int taskCommId) {

@Override
public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
}
LOG.debug("Unregistering Container from TaskAttemptListener: {}", containerId);
ContainerInfo containerInfo = registeredContainers.remove(containerId);
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ protected void startRpcServer() {
} catch (UnknownHostException e) {
throw new TezUncheckedException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
}
LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,14 @@ public ContainerTask getTask(ContainerContext containerContext) throws IOExcepti
} else {
ContainerId containerId = ConverterUtils.toContainerId(containerContext
.getContainerIdentifier());
if (LOG.isDebugEnabled()) {
LOG.debug("Container with id: " + containerId + " asked for a task");
}
LOG.debug("Container with id: {} asked for a task", containerId);
task = getContainerTask(containerId);
if (task != null && !task.shouldDie()) {
getContext().taskSubmitted(task.getTaskSpec().getTaskAttemptID(), containerId);
getContext().taskStartedRemotely(task.getTaskSpec().getTaskAttemptID());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("getTask returning task: " + task);
}
LOG.debug("getTask returning task: {}", task);
return task;
}

Expand All @@ -335,10 +331,7 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce
TezException {
ContainerId containerId = ConverterUtils.toContainerId(request.getContainerIdentifier());
long requestId = request.getRequestId();
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from container"
+ ", request=" + request);
}
LOG.debug("Received heartbeat from container, request={}", request);

ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
Expand Down Expand Up @@ -436,9 +429,7 @@ private ContainerTask getContainerTask(ContainerId containerId) throws IOExcepti
}
} else {
task = null;
if (LOG.isDebugEnabled()) {
LOG.debug("No task assigned yet for running container: " + containerId);
}
LOG.debug("No task assigned yet for running container: {}", containerId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,10 +1651,7 @@ DAGState initializeDAG() {
if (!groupInfo.outputs.isEmpty()) {
// shared outputs
for (String vertexName : groupInfo.groupMembers) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting shared outputs for group: " + groupName +
" on vertex: " + vertexName);
}
LOG.debug("Setting shared outputs for group: {} on vertex: {}", groupName, vertexName);
Vertex v = getVertex(vertexName);
v.addSharedOutputs(groupInfo.outputs);
}
Expand Down
Loading