From 9040a2fc515fc427770dbf7110295e6f6d398e84 Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Wed, 13 Mar 2024 11:02:41 -0700 Subject: [PATCH 1/8] TEZ-4547: Add Tez AM JobID to the JobConf Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf will allow applications to pass that to the committers that need it. --- .../committer/MROutputCommitter.java | 1 + .../tez/mapreduce/hadoop/MRJobConfig.java | 2 ++ .../apache/tez/mapreduce/output/MROutput.java | 2 ++ .../tez/mapreduce/output/TestMROutput.java | 22 +++++++++++++++++++ 4 files changed, 27 insertions(+) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 71e5681cbf..5a18d51d06 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -119,6 +119,7 @@ public void abortOutput(VertexStatus.State finalState) throws IOException { || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; } + jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new org.apache.hadoop.mapred.JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() + " using " + (newApiCommitter ? "new" : "old") + "mapred API"); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index e162460773..a5df718026 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -131,6 +131,8 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; + public static final String MR_PARENT_JOB_ID = "mapreduce.parent.job.id"; + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; /** diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 9aeae25bd9..514bd38c49 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -417,6 +418,7 @@ protected List initializeBase() throws IOException, InterruptedException .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput); + jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput); diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index bfc09dc9b8..b12b085b93 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -57,6 +58,7 @@ import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.ProcessorContext; @@ -131,6 +133,26 @@ public void testMergeConfig() throws Exception { assertEquals("base-value", mergedConf.get("base-key")); } + @Test + public void testParentJobIDSet() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, TextOutputFormat.class, + tmpDir.getPath()) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(), + new Configuration(false)); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + String invalidJobID = "invalid default"; + String parentJobID = output.jobConf.get(MRJobConfig.MR_PARENT_JOB_ID, invalidJobID); + assertNotEquals(parentJobID,invalidJobID); + assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID),parentJobID); + assertEquals(parentJobID, new JobID(String.valueOf(outputContext.getApplicationId().getClusterTimestamp()),outputContext.getApplicationId().getId()).toString()); + } + @Test(timeout = 5000) public void testOldAPI_TextOutputFormat() throws Exception { Configuration conf = new Configuration(); From 452241a1b7faad81b2497fdd4bcae04999526e58 Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Mon, 18 Mar 2024 13:30:33 -0700 Subject: [PATCH 2/8] Addressing PR comments This commit also adds the DAG identifier to the job UUID to ensure that multiple jobs within the same session will be assigned different UUIDs. --- .../tez/runtime/api/OutputCommitterContext.java | 2 ++ .../dag/app/dag/impl/OutputCommitterContextImpl.java | 10 +++++++++- .../org/apache/tez/dag/app/dag/impl/VertexImpl.java | 3 ++- .../tez/mapreduce/committer/MROutputCommitter.java | 3 ++- .../java/org/apache/tez/mapreduce/common/Utils.java | 6 +++++- .../org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 2 +- .../org/apache/tez/mapreduce/output/MROutput.java | 4 ++-- .../org/apache/tez/mapreduce/output/TestMROutput.java | 11 ++++++----- 8 files changed, 29 insertions(+), 12 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java index d254a6a99e..5b8906d520 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java @@ -78,4 +78,6 @@ public interface OutputCommitterContext { */ public int getVertexIndex(); + public int getDagIdentifier(); + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java index dc89514950..44579d751b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java @@ -34,6 +34,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext { private final String dagName; private final String vertexName; private final int vertexIdx; + private final int dagIdentifier; private final RootInputLeafOutput output; public OutputCommitterContextImpl(ApplicationId applicationId, @@ -41,7 +42,8 @@ public OutputCommitterContextImpl(ApplicationId applicationId, String dagName, String vertexName, RootInputLeafOutput output, - int vertexIdx) { + int vertexIdx, + int dagIdentifier) { Objects.requireNonNull(applicationId, "applicationId is null"); Objects.requireNonNull(dagName, "dagName is null"); Objects.requireNonNull(vertexName, "vertexName is null"); @@ -52,6 +54,7 @@ public OutputCommitterContextImpl(ApplicationId applicationId, this.vertexName = vertexName; this.output = output; this.vertexIdx = vertexIdx; + this.dagIdentifier = dagIdentifier; } @Override @@ -94,4 +97,9 @@ public int getVertexIndex() { return vertexIdx; } + @Override + public int getDagIdentifier() { + return dagIdentifier; + } + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f8f2750267..21513d5d15 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2561,7 +2561,8 @@ public Void run() throws Exception { appContext.getCurrentDAG().getName(), vertexName, od, - vertexId.getId()); + vertexId.getId(), + appContext.getCurrentDAG().getID().getId()); OutputCommitter outputCommitter = ReflectionUtils .createClazzInstance(od.getControllerDescriptor().getClassName(), new Class[]{OutputCommitterContext.class}, diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 5a18d51d06..68fb98a9bf 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -18,6 +18,7 @@ package org.apache.tez.mapreduce.committer; +import org.apache.tez.mapreduce.common.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -119,7 +120,7 @@ public void abortOutput(VertexStatus.State finalState) throws IOException { || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; } - jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new org.apache.hadoop.mapred.JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); + jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() + " using " + (newApiCommitter ? "new" : "old") + "mapred API"); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 670ee5db4e..8ba4e62d40 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.JobID; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; @@ -63,5 +64,8 @@ public static Counter getMRCounter(TezCounter tezCounter) { Objects.requireNonNull(tezCounter); return new MRCounters.MRCounter(tezCounter); } - + + public static String createJobUUID(long clusterId, int appId, int dagIdentifier) { + return new JobID(String.valueOf(clusterId), appId).toString()+"_"+String.valueOf(dagIdentifier); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index a5df718026..bfedf8f2e0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -131,7 +131,7 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; - public static final String MR_PARENT_JOB_ID = "mapreduce.parent.job.id"; + public static final String MR_JOB_UUID = "mapreduce.job.uuid"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 514bd38c49..dd50b017a0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -33,6 +33,7 @@ import org.apache.tez.common.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.tez.mapreduce.common.Utils; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,6 @@ import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -414,11 +414,11 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); + jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput); - jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput); diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index b12b085b93..3d7db7b0e2 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -57,6 +57,7 @@ import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.TezTestUtils; +import org.apache.tez.mapreduce.common.Utils; import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; @@ -134,7 +135,7 @@ public void testMergeConfig() throws Exception { } @Test - public void testParentJobIDSet() throws Exception { + public void testJobUUIDSet() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); DataSinkDescriptor dataSink = MROutput @@ -147,10 +148,10 @@ public void testParentJobIDSet() throws Exception { MROutput output = new MROutput(outputContext, 2); output.initialize(); String invalidJobID = "invalid default"; - String parentJobID = output.jobConf.get(MRJobConfig.MR_PARENT_JOB_ID, invalidJobID); - assertNotEquals(parentJobID,invalidJobID); - assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID),parentJobID); - assertEquals(parentJobID, new JobID(String.valueOf(outputContext.getApplicationId().getClusterTimestamp()),outputContext.getApplicationId().getId()).toString()); + String parentJobID = output.jobConf.get(MRJobConfig.MR_JOB_UUID, invalidJobID); + assertNotEquals(parentJobID, invalidJobID); + assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), parentJobID); + assertEquals(parentJobID, Utils.createJobUUID(outputContext.getApplicationId().getClusterTimestamp(), outputContext.getApplicationId().getId(), outputContext.getDagIdentifier())); } @Test(timeout = 5000) From f9e3b453ca8a3698521b7af1d6e07beb55b51758 Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Mon, 1 Apr 2024 11:40:32 -0700 Subject: [PATCH 3/8] Addressing PR comments #2 --- .../org/apache/tez/mapreduce/committer/MROutputCommitter.java | 4 +++- .../src/main/java/org/apache/tez/mapreduce/common/Utils.java | 2 +- .../java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 3 +++ .../main/java/org/apache/tez/mapreduce/output/MROutput.java | 3 ++- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 68fb98a9bf..6aa655a107 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -79,6 +79,9 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); + jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID( + getContext().getApplicationId().getClusterTimestamp(), + getContext().getApplicationId().getId(), getContext().getDagIdentifier())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); committer = getOutputCommitter(getContext()); jobContext = getJobContextFromVertexContext(getContext()); @@ -120,7 +123,6 @@ public void abortOutput(VertexStatus.State finalState) throws IOException { || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; } - jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() + " using " + (newApiCommitter ? "new" : "old") + "mapred API"); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 8ba4e62d40..91445301c6 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -66,6 +66,6 @@ public static Counter getMRCounter(TezCounter tezCounter) { } public static String createJobUUID(long clusterId, int appId, int dagIdentifier) { - return new JobID(String.valueOf(clusterId), appId).toString()+"_"+String.valueOf(dagIdentifier); + return new JobID(String.valueOf(clusterId), appId).toString() + "_" + String.valueOf(dagIdentifier); } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index bfedf8f2e0..fb51bc896f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -131,6 +131,9 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; + /** + * Can be used by downstream applications to set a DAG-wide UUID for some committers which need one + */ public static final String MR_JOB_UUID = "mapreduce.job.uuid"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index dd50b017a0..400eb0ea37 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -414,7 +414,8 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); + jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), + getContext().getApplicationId().getId(), getContext().getDagIdentifier())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), From e13d6f410ed173d3bdf3d67bd4ad62332c3b052f Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Thu, 4 Apr 2024 23:12:13 -0700 Subject: [PATCH 4/8] Rename UUID property Switch UUID property name to the one required by S3A committers. --- .../org/apache/tez/mapreduce/committer/MROutputCommitter.java | 2 +- .../main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 2 +- .../main/java/org/apache/tez/mapreduce/output/MROutput.java | 2 +- .../java/org/apache/tez/mapreduce/output/TestMROutput.java | 3 +-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 6aa655a107..81d50d2579 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -79,7 +79,7 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID( + jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.createJobUUID( getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index fb51bc896f..68aa4e6497 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -134,7 +134,7 @@ public interface MRJobConfig { /** * Can be used by downstream applications to set a DAG-wide UUID for some committers which need one */ - public static final String MR_JOB_UUID = "mapreduce.job.uuid"; + public static final String FS_S3A_COMMITTER_UUID = "fs.s3a.committer.uuid"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 400eb0ea37..12a4714067 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -414,7 +414,7 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.MR_JOB_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), + jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), getContext().getApplicationId().getId(), getContext().getDagIdentifier())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 3d7db7b0e2..bc347c5b42 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -35,7 +35,6 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -148,7 +147,7 @@ public void testJobUUIDSet() throws Exception { MROutput output = new MROutput(outputContext, 2); output.initialize(); String invalidJobID = "invalid default"; - String parentJobID = output.jobConf.get(MRJobConfig.MR_JOB_UUID, invalidJobID); + String parentJobID = output.jobConf.get(MRJobConfig.FS_S3A_COMMITTER_UUID, invalidJobID); assertNotEquals(parentJobID, invalidJobID); assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), parentJobID); assertEquals(parentJobID, Utils.createJobUUID(outputContext.getApplicationId().getClusterTimestamp(), outputContext.getApplicationId().getId(), outputContext.getDagIdentifier())); From 5cef95a4730b1c07843bdd41cb75e680bd819abe Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Mon, 8 Apr 2024 11:49:53 -0700 Subject: [PATCH 5/8] Refactor createJobUUID to getDAGID Refactors the implementation to reuse Tez's DAGID type instead of hand-rolling our own. --- .../tez/mapreduce/committer/MROutputCommitter.java | 6 +++--- .../java/org/apache/tez/mapreduce/common/Utils.java | 7 ++++--- .../org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 2 +- .../java/org/apache/tez/mapreduce/output/MROutput.java | 4 ++-- .../org/apache/tez/mapreduce/output/TestMROutput.java | 10 +++++----- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 81d50d2579..906d3d4dcb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -79,9 +79,9 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.createJobUUID( - getContext().getApplicationId().getClusterTimestamp(), - getContext().getApplicationId().getId(), getContext().getDagIdentifier())); + jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID( + getContext().getApplicationId(), + getContext().getDagIdentifier())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); committer = getOutputCommitter(getContext()); jobContext = getJobContextFromVertexContext(getContext()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 91445301c6..21e9f1ae4b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -29,8 +29,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; @Private @@ -65,7 +66,7 @@ public static Counter getMRCounter(TezCounter tezCounter) { return new MRCounters.MRCounter(tezCounter); } - public static String createJobUUID(long clusterId, int appId, int dagIdentifier) { - return new JobID(String.valueOf(clusterId), appId).toString() + "_" + String.valueOf(dagIdentifier); + public static String getDAGID(ApplicationId id, int dagIdentifier) { + return TezDAGID.getInstance(id, dagIdentifier).toString(); } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index 68aa4e6497..9c0f7b59f1 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -132,7 +132,7 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; /** - * Can be used by downstream applications to set a DAG-wide UUID for some committers which need one + * Used by Hadoop's MagicS3Guard and Staging committers to set a job-wide UUID */ public static final String FS_S3A_COMMITTER_UUID = "fs.s3a.committer.uuid"; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 12a4714067..8af2f36f76 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -414,8 +414,8 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.createJobUUID(getContext().getApplicationId().getClusterTimestamp(), - getContext().getApplicationId().getId(), getContext().getDagIdentifier())); + jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(), + getContext().getDagIdentifier())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index bc347c5b42..585759671d 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -146,11 +146,11 @@ public void testJobUUIDSet() throws Exception { new Configuration(false)); MROutput output = new MROutput(outputContext, 2); output.initialize(); - String invalidJobID = "invalid default"; - String parentJobID = output.jobConf.get(MRJobConfig.FS_S3A_COMMITTER_UUID, invalidJobID); - assertNotEquals(parentJobID, invalidJobID); - assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), parentJobID); - assertEquals(parentJobID, Utils.createJobUUID(outputContext.getApplicationId().getClusterTimestamp(), outputContext.getApplicationId().getId(), outputContext.getDagIdentifier())); + String invalidDAGID = "invalid default"; + String dagID = output.jobConf.get(MRJobConfig.FS_S3A_COMMITTER_UUID, invalidDAGID); + assertNotEquals(dagID, invalidDAGID); + assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID); + assertEquals(dagID, Utils.getDAGID(outputContext.getApplicationId(), outputContext.getDagIdentifier())); } @Test(timeout = 5000) From 83cd25f4012009ed88398475516cd6ff15a1fafc Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Tue, 28 May 2024 12:06:44 -0700 Subject: [PATCH 6/8] Address review comments --- .../dag/app/dag/impl/OutputCommitterContextImpl.java | 10 ++++------ .../org/apache/tez/dag/app/dag/impl/VertexImpl.java | 5 +++-- .../tez/mapreduce/committer/MROutputCommitter.java | 2 +- .../org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 4 ++-- .../java/org/apache/tez/mapreduce/output/MROutput.java | 2 +- .../org/apache/tez/mapreduce/output/TestMROutput.java | 2 +- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java index 44579d751b..c6ffe08bf1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java @@ -38,12 +38,10 @@ public class OutputCommitterContextImpl implements OutputCommitterContext { private final RootInputLeafOutput output; public OutputCommitterContextImpl(ApplicationId applicationId, - int dagAttemptNumber, - String dagName, - String vertexName, - RootInputLeafOutput output, - int vertexIdx, - int dagIdentifier) { + int dagAttemptNumber, + String dagName, + String vertexName, + int dagIdentifier, int vertexIdx, RootInputLeafOutput output) { Objects.requireNonNull(applicationId, "applicationId is null"); Objects.requireNonNull(dagName, "dagName is null"); Objects.requireNonNull(vertexName, "vertexName is null"); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 21513d5d15..c7cf176af7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2560,9 +2560,10 @@ public Void run() throws Exception { appContext.getApplicationAttemptId().getAttemptId(), appContext.getCurrentDAG().getName(), vertexName, - od, + appContext.getCurrentDAG().getID().getId(), vertexId.getId(), - appContext.getCurrentDAG().getID().getId()); + od + ); OutputCommitter outputCommitter = ReflectionUtils .createClazzInstance(od.getControllerDescriptor().getClassName(), new Class[]{OutputCommitterContext.class}, diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 906d3d4dcb..5afb4e0e43 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -79,7 +79,7 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID( + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID( getContext().getApplicationId(), getContext().getDagIdentifier())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index 9c0f7b59f1..066f36cc6c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -132,9 +132,9 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; /** - * Used by Hadoop's MagicS3Guard and Staging committers to set a job-wide UUID + * Used by committers to set a job-wide UUID */ - public static final String FS_S3A_COMMITTER_UUID = "fs.s3a.committer.uuid"; + public static final String JOB_COMMITTER_UUID = "job.committer.uuid"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 8af2f36f76..91ec9ac797 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -414,7 +414,7 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(), + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(), getContext().getDagIdentifier())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 585759671d..27b72aac7c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -147,7 +147,7 @@ public void testJobUUIDSet() throws Exception { MROutput output = new MROutput(outputContext, 2); output.initialize(); String invalidDAGID = "invalid default"; - String dagID = output.jobConf.get(MRJobConfig.FS_S3A_COMMITTER_UUID, invalidDAGID); + String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID); assertNotEquals(dagID, invalidDAGID); assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID); assertEquals(dagID, Utils.getDAGID(outputContext.getApplicationId(), outputContext.getDagIdentifier())); From c0c29b48648cc6ed3b5a8b01f94eb40ec2e6dc10 Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Wed, 26 Jun 2024 12:30:43 -0700 Subject: [PATCH 7/8] Refactor to address more review comments --- .../dag/app/dag/impl/OutputCommitterContextImpl.java | 4 +++- .../tez/mapreduce/committer/MROutputCommitter.java | 4 +--- .../java/org/apache/tez/mapreduce/common/Utils.java | 10 ++++++++-- .../java/org/apache/tez/mapreduce/output/MROutput.java | 3 +-- .../org/apache/tez/mapreduce/output/TestMROutput.java | 2 +- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java index c6ffe08bf1..06be989b9e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java @@ -41,7 +41,9 @@ public OutputCommitterContextImpl(ApplicationId applicationId, int dagAttemptNumber, String dagName, String vertexName, - int dagIdentifier, int vertexIdx, RootInputLeafOutput output) { + int dagIdentifier, + int vertexIdx, + RootInputLeafOutput output) { Objects.requireNonNull(applicationId, "applicationId is null"); Objects.requireNonNull(dagName, "dagName is null"); Objects.requireNonNull(vertexName, "vertexName is null"); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 5afb4e0e43..4a648dc901 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -79,9 +79,7 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID( - getContext().getApplicationId(), - getContext().getDagIdentifier())); + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); committer = getOutputCommitter(getContext()); jobContext = getJobContextFromVertexContext(getContext()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 21e9f1ae4b..4fa0365850 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -33,6 +33,8 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; +import org.apache.tez.runtime.api.OutputCommitterContext; +import org.apache.tez.runtime.api.OutputContext; @Private public final class Utils { @@ -66,7 +68,11 @@ public static Counter getMRCounter(TezCounter tezCounter) { return new MRCounters.MRCounter(tezCounter); } - public static String getDAGID(ApplicationId id, int dagIdentifier) { - return TezDAGID.getInstance(id, dagIdentifier).toString(); + public static String getDAGID(OutputCommitterContext context) { + return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString(); + } + + public static String getDAGID(OutputContext context) { + return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString(); } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 91ec9ac797..b8ac1b3a54 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -414,8 +414,7 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); - jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(), - getContext().getDagIdentifier())); + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 27b72aac7c..3359a6eda2 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -150,7 +150,7 @@ public void testJobUUIDSet() throws Exception { String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID); assertNotEquals(dagID, invalidDAGID); assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID); - assertEquals(dagID, Utils.getDAGID(outputContext.getApplicationId(), outputContext.getDagIdentifier())); + assertEquals(dagID, Utils.getDAGID(outputContext)); } @Test(timeout = 5000) From c56e926ec09861765bd3bbd45a448fe26dbf6d63 Mon Sep 17 00:00:00 2001 From: vnarayanan Date: Wed, 10 Jul 2024 11:08:59 -0700 Subject: [PATCH 8/8] Address checkstyle report --- .../src/main/java/org/apache/tez/mapreduce/common/Utils.java | 1 - .../main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 4fa0365850..85483fc598 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index 066f36cc6c..f1183742fc 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -132,7 +132,7 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; /** - * Used by committers to set a job-wide UUID + * Used by committers to set a job-wide UUID. */ public static final String JOB_COMMITTER_UUID = "job.committer.uuid";