Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ TezClientForTest configureAndCreateTezClient(Map<String, LocalResource> lrs, boo
}

@Test (timeout = 5000)
public void testTezclientApp() throws Exception {
testTezClient(false, true);
public void testTezClientApp() throws Exception {
testTezClient(false, true, "testTezClientApp");
}

@Test (timeout = 5000)
public void testTezclientSession() throws Exception {
testTezClient(true, true);
public void testTezClientSession() throws Exception {
testTezClient(true, true, "testTezClientSession");
}

@Test (timeout = 5000)
Expand Down Expand Up @@ -246,7 +246,7 @@ private void _testTezClientSessionLargeDAGPlan(int maxIPCMsgSize, int payloadSiz
ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("P");
processorDescriptor.setUserPayload(UserPayload.create(ByteBuffer.allocate(payloadSize)));
Vertex vertex = Vertex.create("Vertex", processorDescriptor, 1, Resource.newInstance(1, 1));
DAG dag = DAG.create("DAG").addVertex(vertex);
DAG dag = DAG.create("DAG-testTezClientSessionLargeDAGPlan").addVertex(vertex);

client.start();
client.addAppMasterLocalFiles(localResourceMap);
Expand Down Expand Up @@ -277,7 +277,7 @@ private void _testTezClientSessionLargeDAGPlan(int maxIPCMsgSize, int payloadSiz
@Test (timeout = 5000)
public void testGetClient() throws Exception {
/* BEGIN first TezClient usage without calling stop() */
TezClientForTest client = testTezClient(true, false);
TezClientForTest client = testTezClient(true, false, "testGetClient");
/* END first TezClient usage without calling stop() */

/* BEGIN reuse of AM from new TezClient */
Expand All @@ -295,7 +295,7 @@ public void testGetClient() throws Exception {
LocalResourceVisibility.PUBLIC, 1, 1));
Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
Resource.newInstance(1, 1));
DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
DAG dag = DAG.create("DAG-testGetClient").addVertex(vertex).addTaskLocalFiles(lrDAG);

//Bind TezClient to existing app and submit a dag
DAGClient dagClient = client2.getClient(existingAppId).submitDAG(dag);
Expand All @@ -317,7 +317,7 @@ public void testGetClient() throws Exception {
/* END reuse of AM from new TezClient */
}

public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) throws Exception {
public TezClientForTest testTezClient(boolean isSession, boolean shouldStop, String dagName) throws Exception {
Map<String, LocalResource> lrs = Maps.newHashMap();
String lrName1 = "LR1";
lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
Expand Down Expand Up @@ -351,7 +351,7 @@ public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) thr
LocalResourceVisibility.PUBLIC, 1, 1));
Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
Resource.newInstance(1, 1));
DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
DAG dag = DAG.create("DAG-" + dagName).addVertex(vertex).addTaskLocalFiles(lrDAG);
if (!isSession) {
when(client.sessionAmProxy.getAMStatus(any(), any()))
.thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.SHUTDOWN).build());
Expand Down Expand Up @@ -391,7 +391,7 @@ public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) thr

when(client.mockYarnClient.getApplicationReport(appId2).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
dag = DAG.create("DAG").addVertex(
dag = DAG.create("DAG-2-" + dagName).addVertex(
Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)));
dagClient = client.submitDAG(dag);

Expand Down Expand Up @@ -603,7 +603,8 @@ public void testMultipleSubmissionsJob(boolean isSession) throws Exception {
LocalResourceVisibility.PUBLIC, 1, 1));
Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
Resource.newInstance(1, 1)).addTaskLocalFiles(lrVertex);
DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
DAG dag =
DAG.create("DAG-testMultipleSubmissionsJob-session-" + isSession).addVertex(vertex).addTaskLocalFiles(lrDAG);

// the dag resource will be added to the vertex once
client1.submitDAG(dag);
Expand Down Expand Up @@ -694,7 +695,7 @@ public void testSubmitDAGAppFailed() throws Exception {

Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
Resource.newInstance(1, 1));
DAG dag = DAG.create("DAG").addVertex(vertex);
DAG dag = DAG.create("DAG-testSubmitDAGAppFailed").addVertex(vertex);

try {
client.submitDAG(dag);
Expand Down Expand Up @@ -884,7 +885,7 @@ public void testClientResubmit() throws Exception {
Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1,
Resource.newInstance(1, 1));
vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
DAG dag = DAG.create("DAG-testClientResubmit").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
for (int i = 0; i < 3; ++i) {
try {
client.submitDAG(dag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public void testSessionTokenInAmClc() throws IOException, YarnException {
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGING_DIR.getAbsolutePath());

ApplicationId appId = ApplicationId.newInstance(1000, 1);
DAG dag = DAG.create("testdag");
DAG dag = DAG.create("testdag-testSessionTokenInAmClc");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));

Expand Down Expand Up @@ -449,7 +449,7 @@ public void testAMLoggingOptsSimple() throws IOException, YarnException {
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
DAG dag = DAG.create("DAG-testAMLoggingOptsSimple");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testAMLoggingOptsPerLogger() throws IOException, YarnException {
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
DAG dag = DAG.create("DAG-testAMLoggingOptsPerLogger");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
Expand Down
16 changes: 8 additions & 8 deletions tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testDuplicatedVertices() {
dummyTaskCount, dummyTaskResource);
Vertex v2 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
DAG dag = DAG.create("testDAG");
DAG dag = DAG.create("DAG-testDuplicatedVertices");
dag.addVertex(v1);
try {
dag.addVertex(v2);
Expand All @@ -74,7 +74,7 @@ public void testDuplicatedEdges() {
SchedulingType.CONCURRENT, OutputDescriptor.create("output"),
InputDescriptor.create("input")));

DAG dag = DAG.create("testDAG");
DAG dag = DAG.create("DAG-testDuplicatedEdges");
dag.addVertex(v1);
dag.addVertex(v2);
dag.addEdge(edge1);
Expand All @@ -96,7 +96,7 @@ public void testDuplicatedVertexGroup() {
Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);

DAG dag = DAG.create("testDAG");
DAG dag = DAG.create("DAG-testDuplicatedVertexGroup");
dag.createVertexGroup("group_1", v1, v2);

try {
Expand All @@ -123,7 +123,7 @@ public void testDuplicatedGroupInputEdge() {
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);

DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testDuplicatedGroupInputEdge");
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);

Expand Down Expand Up @@ -156,7 +156,7 @@ public void testDuplicatedGroupInputEdge() {

@Test(timeout = 5000)
public void testDAGConf() {
DAG dag = DAG.create("dag1");
DAG dag = DAG.create("DAG-testDAGConf");
// it's OK to set custom configuration
dag.setConf("unknown_conf", "value");

Expand Down Expand Up @@ -281,7 +281,7 @@ public void testDuplicatedOutput_1() {

@Test(timeout = 5000)
public void testDuplicatedOutput_2() {
DAG dag = DAG.create("dag1");
DAG dag = DAG.create("DAG-testDuplicatedOutput_2");
Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
DataSinkDescriptor dataSink =
DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null);
Expand Down Expand Up @@ -354,7 +354,7 @@ public void testRecreateDAG() {
Resource.newInstance(1, 1));
Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
Resource.newInstance(1, 1));
DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
DAG dag = DAG.create("DAG-testRecreateDAG").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);

TezConfiguration tezConf = new TezConfiguration();
DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false);
Expand All @@ -375,7 +375,7 @@ public void testCreateDAGForHistoryLogLevel() {
Resource.newInstance(1, 1));
Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
Resource.newInstance(1, 1));
DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
DAG dag = DAG.create("DAG-testCreateDAGForHistoryLogLevel").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);

TezConfiguration tezConf = new TezConfiguration();

Expand Down
16 changes: 8 additions & 8 deletions tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testBasicJobPlanSerde() throws IOException {

@Test(timeout = 5000)
public void testEdgeManagerSerde() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testEdgeManagerSerde");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1")
.setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2")
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testEdgeManagerSerde() {

@Test(timeout = 5000)
public void testUserPayloadSerde() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testUserPayloadSerde");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
Expand Down Expand Up @@ -205,7 +205,7 @@ public void testUserPayloadSerde() {

@Test(timeout = 5000)
public void userVertexOrderingIsMaintained() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-userVertexOrderingIsMaintained");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
Expand Down Expand Up @@ -278,7 +278,7 @@ public void userVertexOrderingIsMaintained() {

@Test (timeout=5000)
public void testCredentialsSerde() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testCredentialsSerde");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
Expand Down Expand Up @@ -322,7 +322,7 @@ public void testCredentialsSerde() {

@Test(timeout = 5000)
public void testInvalidExecContext_1() {
DAG dag = DAG.create("dag1");
DAG dag = DAG.create("DAG-testInvalidExecContext_1");
dag.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
dag.addVertex(v1);
Expand Down Expand Up @@ -364,7 +364,7 @@ public void testInvalidExecContext_2() {
VertexExecutionContext.create("plugin", "plugin", "invalidplugin");


DAG dag = DAG.create("dag1");
DAG dag = DAG.create("DAG-testInvalidExecContext_2");
dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true));
Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
dag.addVertex(v1);
Expand Down Expand Up @@ -429,7 +429,7 @@ public void testInvalidExecContext_2() {

@Test(timeout = 5000)
public void testServiceDescriptorPropagation() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testServiceDescriptorPropagation");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
Expand Down Expand Up @@ -492,7 +492,7 @@ public void testServiceDescriptorPropagation() {

@Test(timeout = 5000)
public void testInvalidJavaOpts() {
DAG dag = DAG.create("testDag");
DAG dag = DAG.create("DAG-testInvalidJavaOpts");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1")
.setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1));
Expand Down
Loading