From 3fa02207de356164e2b374500396bd4ae1669a5b Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Sep 2025 09:57:29 +0200 Subject: [PATCH 1/3] TEZ-4008: Pluggable AM FrameworkServices and AmExtensions (2/3) --- tez-api/pom.xml | 5 + .../apache/tez/client/registry/AMRecord.java | 114 ++++++++++++++++++ .../apache/tez/dag/api/TezConfiguration.java | 7 ++ .../org/apache/tez/dag/api/TezConstants.java | 3 + .../org/apache/tez/client/LocalClient.java | 2 +- .../dag/api/client/registry/AMRegistry.java | 51 ++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 34 +++++- .../apache/tez/dag/app/LocalDAGAppMaster.java | 2 +- .../apache/tez/dag/utils/AMRegistryUtils.java | 46 +++++++ .../api/client/registry/TestAMRegistry.java | 101 ++++++++++++++++ .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../apache/tez/dag/app/TestDAGAppMaster.java | 6 +- 12 files changed, 365 insertions(+), 8 deletions(-) create mode 100644 tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java create mode 100644 tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java diff --git a/tez-api/pom.xml b/tez-api/pom.xml index 17836a49a8..7bd0200858 100644 --- a/tez-api/pom.xml +++ b/tez-api/pom.xml @@ -73,6 +73,11 @@ org.apache.hadoop hadoop-yarn-client + + org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + org.apache.commons commons-collections4 diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java new file mode 100644 index 0000000000..aad8182c3e --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import com.google.common.base.Preconditions; + +/** + * Represents an instance of an AM (DAGClientServer) in the AM registry + */ +@InterfaceAudience.Public +public class AMRecord { + private ApplicationId appId; + private String host; + private int port; + private String id; + private final static String APP_ID_RECORD_KEY = "appId"; + private final static String HOST_RECORD_KEY = "host"; + private final static String PORT_RECORD_KEY = "port"; + private final static String OPAQUE_ID_KEY = "id"; + + public AMRecord(ApplicationId appId, String host, int port, String id) { + Preconditions.checkNotNull(appId); + Preconditions.checkNotNull(host); + this.appId = appId; + this.host = host; + this.port = port; + //If id is not provided, convert to empty string + this.id = (id == null) ? "" : id; + } + + public AMRecord(AMRecord other) { + Preconditions.checkNotNull(other); + this.appId = other.getApplicationId(); + this.host = other.getHost(); + this.port = other.getPort(); + this.id = other.getId(); + } + + public AMRecord(ServiceRecord serviceRecord) { + String serviceAppId = serviceRecord.get(APP_ID_RECORD_KEY); + Preconditions.checkNotNull(serviceAppId); + this.appId = ApplicationId.fromString(serviceAppId); + String serviceHost = serviceRecord.get(HOST_RECORD_KEY); + Preconditions.checkNotNull(serviceHost); + this.host = serviceHost; + String servicePort = serviceRecord.get(PORT_RECORD_KEY); + this.port = Integer.parseInt(servicePort); + String serviceId = serviceRecord.get(OPAQUE_ID_KEY); + Preconditions.checkNotNull(serviceId); + this.id = serviceId; + } + + public ApplicationId getApplicationId() { + return appId; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getId() { return id; } + + @Override + public boolean equals(Object other) { + if(other instanceof AMRecord) { + AMRecord otherRecord = (AMRecord) other; + return appId.equals(otherRecord.appId) + && host.equals(otherRecord.host) + && port == otherRecord.port + && id.equals(otherRecord.id); + } else { + return false; + } + } + + @Override + public int hashCode() { + return appId.hashCode() * host.hashCode() * id.hashCode() + port; + } + + public ServiceRecord toServiceRecord() { + ServiceRecord serviceRecord = new ServiceRecord(); + serviceRecord.set(APP_ID_RECORD_KEY, appId); + serviceRecord.set(HOST_RECORD_KEY, host); + serviceRecord.set(PORT_RECORD_KEY, port); + serviceRecord.set(OPAQUE_ID_KEY, id); + return serviceRecord; + } + +} diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index a95099dbe0..adc4ce493d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2349,4 +2349,11 @@ static Set getPropertySet() { @ConfigurationScope(Scope.AM) @ConfigurationProperty public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs"; + + /** + * String value. The class to be used for the AM registry. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_REGISTRY_CLASS = TEZ_AM_PREFIX + "registry.class"; } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 9bf5e0503d..29477edf0c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -102,6 +102,9 @@ public final class TezConstants { /// Version-related Environment variables public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION"; + //Arbitrary opaque ID to identify AM instances from AMRegistryClient + public static final String TEZ_AM_UUID = "UUID"; + private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn"; private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber"; diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 9b65f2f452..79f9f15a64 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -415,7 +415,7 @@ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemp versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto) : new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); + versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, null); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java new file mode 100644 index 0000000000..76fbe4439e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.registry; + +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.client.registry.AMRecord; + +/** + * Base class for AMRegistry implementation + * Implementation class is configured by tez.am.registry.class + * Implementations should implement relevant service lifecycle operations: + * init, serviceStart, serviceStop, etc.. + * + * init/serviceStart will be invoked during DAGAppMaster.serviceInit + * + * serviceStop will invoked on DAGAppMaster shutdown + */ +public abstract class AMRegistry extends AbstractService { + + /* Implementations should provide a public no-arg constructor */ + protected AMRegistry(String name) { + super(name); + } + + /* Under typical usage, add will be called once automatically with an AMRecord + for the DAGClientServer servicing an AM + */ + public abstract void add(AMRecord server) throws Exception; + + /* + Under typical usage, implementations should remove any stale AMRecords upon serviceStop + */ + public abstract void remove(AMRecord server) throws Exception; + +} \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a8c17ed7de..aa4547833f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -97,6 +97,7 @@ import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; +import org.apache.tez.client.registry.AMRecord; import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.AsyncDispatcherConcurrent; import org.apache.tez.common.ContainerSignatureMatcher; @@ -125,6 +126,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientHandler; import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.api.client.registry.AMRegistry; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; @@ -182,6 +184,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.AMRegistryUtils; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; @@ -244,6 +247,7 @@ public class DAGAppMaster extends AbstractService { private String appName; private final ApplicationAttemptId appAttemptID; private final ContainerId containerID; + private String amUUID; private final String nmHost; private final int nmPort; private final int nmHttpPort; @@ -350,7 +354,8 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String [] localDirs, String[] logDirs, String clientVersion, - Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { + Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, + String amUUID) { super(DAGAppMaster.class.getName()); this.mdcContext = LoggingUtils.setupLog4j(); this.clock = clock; @@ -358,6 +363,7 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, this.appSubmitTime = appSubmitTime; this.appAttemptID = applicationAttemptId; this.containerID = containerId; + this.amUUID = amUUID; this.nmHost = nmHost; this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; @@ -632,6 +638,10 @@ protected void serviceInit(final Configuration conf) throws Exception { .setDaemon(true).setNameFormat("App Shared Pool - #%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor); + AMRegistry amRegistry = AMRegistryUtils.createAMRegistry(conf); + initAmRegistry(appAttemptID.getApplicationId(), amUUID, amRegistry, clientRpcServer); + addIfService(amRegistry, false); + initServices(conf); super.serviceInit(conf); @@ -659,6 +669,25 @@ protected void initClientRpcServer() { addIfService(clientRpcServer, true); } + @VisibleForTesting + public static void initAmRegistry(ApplicationId appId, String amUUID, AMRegistry amRegistry, DAGClientServer dagClientServer) throws Exception { + if(amRegistry != null) { + dagClientServer.registerServiceListener((service) -> { + if (service.isInState(STATE.STARTED)) { + AMRecord amRecord = AMRegistryUtils.recordForDAGClientServer( + appId, + amUUID, + dagClientServer); + try { + amRegistry.add(amRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + } + @VisibleForTesting protected DAGAppMasterShutdownHandler createShutdownHandler() { return new DAGAppMasterShutdownHandler(); @@ -2382,6 +2411,7 @@ public static void main(String[] args) { String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV); + String amUUID = System.getenv(TezConstants.TEZ_AM_UUID); if (clientVersion == null) { clientVersion = VersionInfo.UNKNOWN; } @@ -2446,7 +2476,7 @@ public static void main(String[] args) { System.getenv(Environment.PWD.name()), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())), - clientVersion, credentials, jobUserName, amPluginDescriptorProto); + clientVersion, credentials, jobUserName, amPluginDescriptorProto, amUUID); ShutdownHookManager.get().addShutdownHook( new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java index e0c8443577..71eafd8965 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java @@ -37,7 +37,7 @@ public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName, - pluginDescriptorProto); + pluginDescriptorProto, null); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java new file mode 100644 index 0000000000..d833fb1344 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.utils; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.api.client.registry.AMRegistry; + +public class AMRegistryUtils { + public static AMRecord recordForDAGClientServer(ApplicationId appId, String opaqueId, DAGClientServer dagClientServer) { + InetSocketAddress address = dagClientServer.getBindAddress(); + return new AMRecord(appId, address.getHostName(), address.getPort(), opaqueId); + } + + public static AMRegistry createAMRegistry(Configuration conf) throws Exception { + String tezAMRegistryClass = conf.get(TezConfiguration.TEZ_AM_REGISTRY_CLASS); + if(tezAMRegistryClass == null) { + return null; + } else { + AMRegistry amRegistry = ReflectionUtils.createClazzInstance(tezAMRegistryClass); + return amRegistry; + } + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java new file mode 100644 index 0000000000..8285d1553c --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.registry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.*; + +import java.net.InetSocketAddress; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClientHandler; +import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.utils.AMRegistryUtils; + +import org.junit.Test; + +public class TestAMRegistry { + + @Test(timeout = 5000) + public void testAMRegistryFactory() throws Exception { + Configuration conf = new Configuration(); + AMRegistry amRegistry = AMRegistryUtils.createAMRegistry(conf); + assertNull(amRegistry); + String className = "org.apache.tez.dag.api.client.registry.TestAMRegistry$SkeletonAMRegistry"; + conf.set(TezConfiguration.TEZ_AM_REGISTRY_CLASS, className); + amRegistry = AMRegistryUtils.createAMRegistry(conf); + assertEquals(amRegistry.getClass().getName(), className); + } + + @Test(timeout = 5000) + public void testRecordForDagServer() { + DAGClientServer dagClientServer = mock(DAGClientServer.class); + when(dagClientServer.getBindAddress()).thenReturn(new InetSocketAddress("testhost", 1000)); + ApplicationId appId = ApplicationId.newInstance(0, 1); + String id = UUID.randomUUID().toString(); + AMRecord record = AMRegistryUtils.recordForDAGClientServer( + appId, + id, + dagClientServer + ); + assertEquals(record.getApplicationId(), appId); + assertEquals(record.getHost(), "testhost"); + assertEquals(record.getPort(), 1000); + assertEquals(record.getId(), id); + } + + @Test(timeout = 20000) + public void testAMRegistryService() throws Exception { + DAGClientHandler dagClientHandler = mock(DAGClientHandler.class); + ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); + ApplicationId appId = ApplicationId.newInstance(0, 1); + String uuid = UUID.randomUUID().toString(); + when(appAttemptId.getApplicationId()).thenReturn(appId); + AMRegistry amRegistry = mock(AMRegistry.class); + FileSystem fs = mock(FileSystem.class); + DAGClientServer dagClientServer = new DAGClientServer(dagClientHandler, appAttemptId, fs); + try { + DAGAppMaster.initAmRegistry(appAttemptId.getApplicationId(), uuid, amRegistry, dagClientServer); + dagClientServer.init(new Configuration()); + dagClientServer.start(); + AMRecord record = + AMRegistryUtils.recordForDAGClientServer(appId, uuid, dagClientServer); + verify(amRegistry, times(1)).add(record); + } finally { + dagClientServer.stop(); + } + } + + public static class SkeletonAMRegistry extends AMRegistry { + public SkeletonAMRegistry() { + super("SkeletonAMRegistry"); + } + @Override public void add(AMRecord server) throws Exception { } + @Override public void remove(AMRecord server) throws Exception { } + } + +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index fbab519376..213d85b892 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -500,7 +500,7 @@ public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId c Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), - credentials, jobUserName, null); + credentials, jobUserName, null, null); shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.launcherGoFlag = launcherGoFlag; this.initFailFlag = initFailFlag; diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 40a8e20cd5..85a8248b95 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -552,7 +552,7 @@ public void testBadProgress() throws Exception { TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, new String[] {TEST_DIR.toString()}, new TezApiVersionInfo().getVersion(), amCreds, - "someuser", null)); + "someuser", null, null)); when(am.getState()).thenReturn(DAGAppMasterState.RUNNING); am.init(conf); am.start(); @@ -637,7 +637,7 @@ private void testDagCredentials(boolean doMerge) throws IOException { TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, new String[] {TEST_DIR.toString()}, new TezApiVersionInfo().getVersion(), amCreds, - "someuser", null); + "someuser", null, null); am.init(conf); am.start(); @@ -758,7 +758,7 @@ public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) { super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, - new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null); + new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null, null); } public static Credentials createCredentials() { From 6ff2aa5032c6d3c0b732a0bd171255937acd3add Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Sep 2025 12:52:43 +0200 Subject: [PATCH 2/3] various PR fixes (checkstyle, spotbugs) --- tez-api/findbugs-exclude.xml | 20 +++++++ .../apache/tez/client/registry/AMRecord.java | 54 ++++++++----------- .../tez/client/registry/package-info.java | 24 +++++++++ .../apache/tez/dag/api/TezConfiguration.java | 6 +-- .../dag/api/client/registry/AMRegistry.java | 24 ++++----- .../dag/api/client/registry/package-info.java | 24 +++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 10 ++-- .../apache/tez/dag/utils/AMRegistryUtils.java | 15 +++--- .../api/client/registry/TestAMRegistry.java | 18 +++---- 9 files changed, 120 insertions(+), 75 deletions(-) create mode 100644 tez-api/src/main/java/org/apache/tez/client/registry/package-info.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml index 10d27f7119..de8f3824cd 100644 --- a/tez-api/findbugs-exclude.xml +++ b/tez-api/findbugs-exclude.xml @@ -131,4 +131,24 @@ + + + + + + + + + + + + + + + + + + diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java index aad8182c3e..fbfa795e6d 100644 --- a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java +++ b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java @@ -22,25 +22,23 @@ import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.yarn.api.records.ApplicationId; -import com.google.common.base.Preconditions; /** - * Represents an instance of an AM (DAGClientServer) in the AM registry + * Represents an instance of an AM (DAGClientServer) in the AM registry. */ @InterfaceAudience.Public public class AMRecord { - private ApplicationId appId; - private String host; - private int port; - private String id; - private final static String APP_ID_RECORD_KEY = "appId"; - private final static String HOST_RECORD_KEY = "host"; - private final static String PORT_RECORD_KEY = "port"; - private final static String OPAQUE_ID_KEY = "id"; + private static final String APP_ID_RECORD_KEY = "appId"; + private static final String HOST_RECORD_KEY = "host"; + private static final String PORT_RECORD_KEY = "port"; + private static final String OPAQUE_ID_KEY = "id"; + + private final ApplicationId appId; + private final String host; + private final int port; + private final String id; public AMRecord(ApplicationId appId, String host, int port, String id) { - Preconditions.checkNotNull(appId); - Preconditions.checkNotNull(host); this.appId = appId; this.host = host; this.port = port; @@ -49,7 +47,6 @@ public AMRecord(ApplicationId appId, String host, int port, String id) { } public AMRecord(AMRecord other) { - Preconditions.checkNotNull(other); this.appId = other.getApplicationId(); this.host = other.getHost(); this.port = other.getPort(); @@ -57,17 +54,10 @@ public AMRecord(AMRecord other) { } public AMRecord(ServiceRecord serviceRecord) { - String serviceAppId = serviceRecord.get(APP_ID_RECORD_KEY); - Preconditions.checkNotNull(serviceAppId); - this.appId = ApplicationId.fromString(serviceAppId); - String serviceHost = serviceRecord.get(HOST_RECORD_KEY); - Preconditions.checkNotNull(serviceHost); - this.host = serviceHost; - String servicePort = serviceRecord.get(PORT_RECORD_KEY); - this.port = Integer.parseInt(servicePort); - String serviceId = serviceRecord.get(OPAQUE_ID_KEY); - Preconditions.checkNotNull(serviceId); - this.id = serviceId; + this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY)); + this.host = serviceRecord.get(HOST_RECORD_KEY); + this.port = Integer.parseInt(serviceRecord.get(PORT_RECORD_KEY)); + this.id = serviceRecord.get(OPAQUE_ID_KEY); } public ApplicationId getApplicationId() { @@ -82,12 +72,13 @@ public int getPort() { return port; } - public String getId() { return id; } + public String getId() { + return id; + } @Override public boolean equals(Object other) { - if(other instanceof AMRecord) { - AMRecord otherRecord = (AMRecord) other; + if (other instanceof AMRecord otherRecord) { return appId.equals(otherRecord.appId) && host.equals(otherRecord.host) && port == otherRecord.port @@ -97,11 +88,6 @@ public boolean equals(Object other) { } } - @Override - public int hashCode() { - return appId.hashCode() * host.hashCode() * id.hashCode() + port; - } - public ServiceRecord toServiceRecord() { ServiceRecord serviceRecord = new ServiceRecord(); serviceRecord.set(APP_ID_RECORD_KEY, appId); @@ -111,4 +97,8 @@ public ServiceRecord toServiceRecord() { return serviceRecord; } + @Override + public int hashCode() { + return appId.hashCode() * host.hashCode() * id.hashCode() + port; + } } diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java b/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java new file mode 100644 index 0000000000..f08bdfc8db --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Public +@Evolving +package org.apache.tez.client.registry; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; \ No newline at end of file diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index adc4ce493d..2176141d06 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2353,7 +2353,7 @@ static Set getPropertySet() { /** * String value. The class to be used for the AM registry. */ - @ConfigurationScope(Scope.AM) - @ConfigurationProperty - public static final String TEZ_AM_REGISTRY_CLASS = TEZ_AM_PREFIX + "registry.class"; + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_REGISTRY_CLASS = TEZ_AM_PREFIX + "registry.class"; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java index 76fbe4439e..8c6ab58984 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java @@ -22,30 +22,26 @@ import org.apache.tez.client.registry.AMRecord; /** - * Base class for AMRegistry implementation - * Implementation class is configured by tez.am.registry.class - * Implementations should implement relevant service lifecycle operations: - * init, serviceStart, serviceStop, etc.. + * Base class for AMRegistry implementations. + * The specific implementation class is configured by `tez.am.registry.class`. * - * init/serviceStart will be invoked during DAGAppMaster.serviceInit - * - * serviceStop will invoked on DAGAppMaster shutdown + * Implementations should handle the relevant service lifecycle operations: + * `init`, `serviceStart`, `serviceStop`, etc. + * - `init` and `serviceStart` are invoked during `DAGAppMaster.serviceInit`. + * - `serviceStop` is invoked on `DAGAppMaster` shutdown. */ public abstract class AMRegistry extends AbstractService { - /* Implementations should provide a public no-arg constructor */ + /* Implementations should provide a public no-arg constructor. */ protected AMRegistry(String name) { super(name); } - /* Under typical usage, add will be called once automatically with an AMRecord - for the DAGClientServer servicing an AM - */ + /* Under typical usage, add() will be called once automatically with an AMRecord + for the DAGClientServer that services an AM. */ public abstract void add(AMRecord server) throws Exception; - /* - Under typical usage, implementations should remove any stale AMRecords upon serviceStop - */ + /* Under typical usage, implementations should remove any stale AMRecords upon serviceStop. */ public abstract void remove(AMRecord server) throws Exception; } \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java new file mode 100644 index 0000000000..943f5197bd --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Public +@Evolving +package org.apache.tez.dag.api.client.registry; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index aa4547833f..99579c7ff7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -670,14 +670,12 @@ protected void initClientRpcServer() { } @VisibleForTesting - public static void initAmRegistry(ApplicationId appId, String amUUID, AMRegistry amRegistry, DAGClientServer dagClientServer) throws Exception { - if(amRegistry != null) { + public static void initAmRegistry(ApplicationId appId, String amUUID, AMRegistry amRegistry, + DAGClientServer dagClientServer) { + if (amRegistry != null) { dagClientServer.registerServiceListener((service) -> { if (service.isInState(STATE.STARTED)) { - AMRecord amRecord = AMRegistryUtils.recordForDAGClientServer( - appId, - amUUID, - dagClientServer); + AMRecord amRecord = AMRegistryUtils.recordForDAGClientServer(appId, amUUID, dagClientServer); try { amRegistry.add(amRecord); } catch (Exception e) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java index d833fb1344..13cc27cbac 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java @@ -28,19 +28,18 @@ import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.client.registry.AMRegistry; -public class AMRegistryUtils { - public static AMRecord recordForDAGClientServer(ApplicationId appId, String opaqueId, DAGClientServer dagClientServer) { +public final class AMRegistryUtils { + + private AMRegistryUtils() {} + + public static AMRecord recordForDAGClientServer(ApplicationId appId, String opaqueId, + DAGClientServer dagClientServer) { InetSocketAddress address = dagClientServer.getBindAddress(); return new AMRecord(appId, address.getHostName(), address.getPort(), opaqueId); } public static AMRegistry createAMRegistry(Configuration conf) throws Exception { String tezAMRegistryClass = conf.get(TezConfiguration.TEZ_AM_REGISTRY_CLASS); - if(tezAMRegistryClass == null) { - return null; - } else { - AMRegistry amRegistry = ReflectionUtils.createClazzInstance(tezAMRegistryClass); - return amRegistry; - } + return tezAMRegistryClass == null ? null : ReflectionUtils.createClazzInstance(tezAMRegistryClass); } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java index 8285d1553c..75a7d6e7cf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java @@ -48,7 +48,7 @@ public void testAMRegistryFactory() throws Exception { String className = "org.apache.tez.dag.api.client.registry.TestAMRegistry$SkeletonAMRegistry"; conf.set(TezConfiguration.TEZ_AM_REGISTRY_CLASS, className); amRegistry = AMRegistryUtils.createAMRegistry(conf); - assertEquals(amRegistry.getClass().getName(), className); + assertEquals(className, amRegistry.getClass().getName()); } @Test(timeout = 5000) @@ -57,14 +57,10 @@ public void testRecordForDagServer() { when(dagClientServer.getBindAddress()).thenReturn(new InetSocketAddress("testhost", 1000)); ApplicationId appId = ApplicationId.newInstance(0, 1); String id = UUID.randomUUID().toString(); - AMRecord record = AMRegistryUtils.recordForDAGClientServer( - appId, - id, - dagClientServer - ); - assertEquals(record.getApplicationId(), appId); - assertEquals(record.getHost(), "testhost"); - assertEquals(record.getPort(), 1000); + AMRecord record = AMRegistryUtils.recordForDAGClientServer(appId, id, dagClientServer); + assertEquals(appId, record.getApplicationId()); + assertEquals("testhost", record.getHost()); + assertEquals(1000, record.getPort()); assertEquals(record.getId(), id); } @@ -82,8 +78,7 @@ public void testAMRegistryService() throws Exception { DAGAppMaster.initAmRegistry(appAttemptId.getApplicationId(), uuid, amRegistry, dagClientServer); dagClientServer.init(new Configuration()); dagClientServer.start(); - AMRecord record = - AMRegistryUtils.recordForDAGClientServer(appId, uuid, dagClientServer); + AMRecord record = AMRegistryUtils.recordForDAGClientServer(appId, uuid, dagClientServer); verify(amRegistry, times(1)).add(record); } finally { dagClientServer.stop(); @@ -97,5 +92,4 @@ public SkeletonAMRegistry() { @Override public void add(AMRecord server) throws Exception { } @Override public void remove(AMRecord server) throws Exception { } } - } From b05793a188da909f26808d9b24e6262a04636952 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 4 Sep 2025 10:29:27 +0200 Subject: [PATCH 3/3] review comments --- tez-api/pom.xml | 2 +- .../apache/tez/client/registry/AMRecord.java | 63 ++++++++++++++++++- .../org/apache/tez/dag/api/TezConstants.java | 2 +- .../api/client/registry/TestAMRegistry.java | 9 ++- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/tez-api/pom.xml b/tez-api/pom.xml index 7bd0200858..ff1f9d2e6a 100644 --- a/tez-api/pom.xml +++ b/tez-api/pom.xml @@ -75,7 +75,7 @@ org.apache.hadoop - hadoop-yarn-registry + hadoop-registry ${hadoop.version} diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java index fbfa795e6d..8453f9836c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java +++ b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java @@ -18,13 +18,21 @@ package org.apache.tez.client.registry; +import java.util.Objects; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.yarn.api.records.ApplicationId; /** - * Represents an instance of an AM (DAGClientServer) in the AM registry. + * Record representing an Application Master (AM) instance within Tez. + *

+ * This class can be serialized to and from a {@link ServiceRecord}, enabling + * storage and retrieval of AM metadata in external systems. Some constructors + * and methods are not necessarily used within the Tez codebase itself, but + * are part of the Tez API and intended for Tez clients that manage or interact + * with Tez unmanaged sessions. */ @InterfaceAudience.Public public class AMRecord { @@ -38,6 +46,19 @@ public class AMRecord { private final int port; private final String id; + /** + * Creates a new {@code AMRecord} with the given application ID, host, port, and identifier. + *

+ * If the provided identifier is {@code null}, it will be converted to an empty string. + *

+ * Although this constructor may not be used directly within Tez internals, + * it is part of the public API for Tez clients that handle unmanaged sessions. + * + * @param appId the {@link ApplicationId} of the Tez application + * @param host the hostname where the Application Master is running + * @param port the port number on which the Application Master is listening + * @param id an opaque identifier for the record; if {@code null}, defaults to an empty string + */ public AMRecord(ApplicationId appId, String host, int port, String id) { this.appId = appId; this.host = host; @@ -46,6 +67,16 @@ public AMRecord(ApplicationId appId, String host, int port, String id) { this.id = (id == null) ? "" : id; } + /** + * Copy constructor. + *

+ * Creates a new {@code AMRecord} by copying the fields of another instance. + *

+ * This constructor is mainly useful for client-side logic and session handling, + * and may not be invoked directly within the Tez codebase. + * + * @param other the {@code AMRecord} instance to copy + */ public AMRecord(AMRecord other) { this.appId = other.getApplicationId(); this.host = other.getHost(); @@ -53,6 +84,17 @@ public AMRecord(AMRecord other) { this.id = other.getId(); } + /** + * Constructs a new {@code AMRecord} from a {@link ServiceRecord}. + *

+ * This allows conversion from serialized metadata back into an in-memory {@code AMRecord}. + *

+ * While not always used in Tez internals, it exists in the Tez API so + * clients can reconstruct AM information when working with unmanaged sessions. + * + * @param serviceRecord the {@link ServiceRecord} containing AM metadata + * @throws IllegalArgumentException if required keys are missing or invalid + */ public AMRecord(ServiceRecord serviceRecord) { this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY)); this.host = serviceRecord.get(HOST_RECORD_KEY); @@ -78,6 +120,9 @@ public String getId() { @Override public boolean equals(Object other) { + if (this == other) { + return true; + } if (other instanceof AMRecord otherRecord) { return appId.equals(otherRecord.appId) && host.equals(otherRecord.host) @@ -88,6 +133,20 @@ public boolean equals(Object other) { } } + /** + * Converts this {@code AMRecord} into a {@link ServiceRecord}. + *

+ * The returned {@link ServiceRecord} contains the Application Master metadata + * (application ID, host, port, and opaque identifier) so that it can be stored + * in an external registry or retrieved later. + *

+ * While this method may not be directly used within Tez internals, + * it is part of the Tez public API and is intended for Tez clients + * that interact with unmanaged sessions or otherwise need to + * persist/reconstruct Application Master information. + * + * @return a {@link ServiceRecord} populated with the values of this {@code AMRecord} + */ public ServiceRecord toServiceRecord() { ServiceRecord serviceRecord = new ServiceRecord(); serviceRecord.set(APP_ID_RECORD_KEY, appId); @@ -99,6 +158,6 @@ public ServiceRecord toServiceRecord() { @Override public int hashCode() { - return appId.hashCode() * host.hashCode() * id.hashCode() + port; + return Objects.hash(appId, host, port, id); } } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 29477edf0c..71aff74801 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -103,7 +103,7 @@ public final class TezConstants { public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION"; //Arbitrary opaque ID to identify AM instances from AMRegistryClient - public static final String TEZ_AM_UUID = "UUID"; + public static final String TEZ_AM_UUID = "TEZ_AM_UUID"; private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn"; private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber"; diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java index 75a7d6e7cf..dc8cc4acf7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java @@ -19,8 +19,12 @@ package org.apache.tez.dag.api.client.registry; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.net.InetSocketAddress; import java.util.UUID; @@ -45,9 +49,10 @@ public void testAMRegistryFactory() throws Exception { Configuration conf = new Configuration(); AMRegistry amRegistry = AMRegistryUtils.createAMRegistry(conf); assertNull(amRegistry); - String className = "org.apache.tez.dag.api.client.registry.TestAMRegistry$SkeletonAMRegistry"; + String className = SkeletonAMRegistry.class.getName(); conf.set(TezConfiguration.TEZ_AM_REGISTRY_CLASS, className); amRegistry = AMRegistryUtils.createAMRegistry(conf); + assertNotNull(amRegistry); assertEquals(className, amRegistry.getClass().getName()); }