Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ public class TezClient {
Map<String, LocalResource> cachedTezJarResources;
boolean usingTezArchiveDeploy = false;
private static final long SLEEP_FOR_READY = 500;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private final JobTokenSecretManager jobTokenSecretManager;
private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
@VisibleForTesting
final TezApiVersionInfo apiVersionInfo;
Expand Down Expand Up @@ -206,6 +205,8 @@ protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT);
Limits.setConfiguration(tezConf);

this.jobTokenSecretManager = new JobTokenSecretManager(tezConf);

LOG.info("Tez Client Version: " + apiVersionInfo.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;

Expand All @@ -37,11 +38,31 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
private final SecretKey masterKey;
private final Map<String, SecretKey> currentJobTokens;
private final Mac mac;


/**
* @param conf a mandatory configuration for JobTokenSecretManager to prevent algorithm mismatch
*/
public JobTokenSecretManager(Configuration conf) {
this(null, conf);
}

public JobTokenSecretManager(SecretKey key, Configuration conf) {
String algorithm = getAlgorithm(conf);
SecretKey masterKey = (key == null) ? generateSecret() : key;
this.currentJobTokens = new TreeMap<>();
try {
mac = Mac.getInstance(algorithm);
mac.init(masterKey);
} catch (NoSuchAlgorithmException nsa) {
throw new IllegalArgumentException("Can't find " + algorithm + " algorithm.", nsa);
} catch (InvalidKeyException ike) {
throw new IllegalArgumentException("Invalid key to " + algorithm + " computation", ike);
}
}

/**
* Convert the byte[] to a secret key
* @param key the byte[] to create the secret key from
Expand Down Expand Up @@ -72,27 +93,11 @@ public byte[] computeHash(byte[] msg) {
}
}

/**
* Default constructor
*/
public JobTokenSecretManager() {
this(null);
private String getAlgorithm(Configuration conf) {
// TODO: TEZ-4607: replace with CommonConfigurationKeysPublic enum values
return conf.get("hadoop.security.secret-manager.key-generator.algorithm", "HmacSHA1");
}

public JobTokenSecretManager(SecretKey key) {
this.masterKey = (key == null) ? generateSecret() : key;
this.currentJobTokens = new TreeMap<String, SecretKey>();
try {
mac = Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
mac.init(masterKey);
} catch (NoSuchAlgorithmException nsa) {
throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm.", nsa);
} catch (InvalidKeyException ike) {
throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
}
}


/**
* Create a new password/secret for the given job token identifier.
* @param identifier the job token identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public void testAppSubmissionContextForPriority() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
TezClientUtils.createSessionToken(appId.toString(),
new JobTokenSecretManager(), credentials);
new JobTokenSecretManager(tezConf), credentials);
tezConf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
Map<String, LocalResource> m = new HashMap<String, LocalResource>();
tezConf.setInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, testpriority);
Expand Down Expand Up @@ -413,7 +413,7 @@ public void testSessionTokenInAmClc() throws IOException, YarnException {
.setTaskLaunchCmdOpts("initialLaunchOpts"));

Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(tezConf);
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
assertNotNull(jobToken);
Expand Down Expand Up @@ -447,7 +447,7 @@ public void testAMLoggingOptsSimple() throws IOException, YarnException {

ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(tezConf);
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("DAG-testAMLoggingOptsSimple");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
Expand Down Expand Up @@ -488,7 +488,7 @@ public void testAMLoggingOptsPerLogger() throws IOException, YarnException {

ApplicationId appId = ApplicationId.newInstance(1000, 1);
Credentials credentials = new Credentials();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(tezConf);
TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("DAG-testAMLoggingOptsPerLogger");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ public class DAGAppMaster extends AbstractService {
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
private TaskCommunicatorManagerInterface taskCommunicatorManager;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobTokenSecretManager jobTokenSecretManager;
private Token<JobTokenIdentifier> sessionToken;
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
Expand Down Expand Up @@ -520,6 +519,8 @@ protected void serviceInit(final Configuration conf) throws Exception {
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler, true);

jobTokenSecretManager = new JobTokenSecretManager(amConf);

sessionToken =
TokenCache.getSessionToken(amCredentials);
if (sessionToken == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ public void shutdown() {

protected void startRpcServer() {
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(conf);
jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);

server = new RPC.Builder(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ public void testBadProgress() throws Exception {

// create some sample AM credentials
Credentials amCreds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenSecretManager jtsm = new JobTokenSecretManager(conf);
JobTokenIdentifier identifier = new JobTokenIdentifier(
new Text(appId.toString()));
Token<JobTokenIdentifier> sessionToken =
Expand Down Expand Up @@ -608,7 +608,7 @@ private void testDagCredentials(boolean doMerge) throws IOException {

// create some sample AM credentials
Credentials amCreds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenSecretManager jtsm = new JobTokenSecretManager(conf);
JobTokenIdentifier identifier = new JobTokenIdentifier(
new Text(appId.toString()));
Token<JobTokenIdentifier> sessionToken =
Expand Down Expand Up @@ -764,7 +764,7 @@ public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) {

public static Credentials createCredentials() {
Credentials creds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenSecretManager jtsm = new JobTokenSecretManager(new TezConfiguration());
JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm);
TokenCache.setSessionToken(token, creds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void testPortRange_NotSpecified() throws IOException, TezException {
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
"fakeIdentifier"));
Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
new JobTokenSecretManager());
new JobTokenSecretManager(conf));
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, credentials);
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
Expand All @@ -381,7 +381,7 @@ private boolean testPortRange(int port) {
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
"fakeIdentifier"));
Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
new JobTokenSecretManager());
new JobTokenSecretManager(conf));
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, credentials);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezDAGID;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected void build() {
deletionTracker.addNodeShufflePort(nodeId, shufflePort);
Assert.assertEquals("Unexpected number of entries in NodeIdShufflePortMap!",
1, deletionTracker.getNodeIdShufflePortMap().size());
deletionTracker.dagComplete(new TezDAGID(), new JobTokenSecretManager());
deletionTracker.dagComplete(new TezDAGID(), new JobTokenSecretManager(new TezConfiguration()));
Assert.assertEquals("Unexpected number of entries in NodeIdShufflePortMap after dagComplete!",
1, deletionTracker.getNodeIdShufflePortMap().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Thread newThread(Runnable r) {
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));

userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
secretManager = new JobTokenSecretManager(conf);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testReduceProcessor() throws Exception {
LOG.info("Starting reduce...");

JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(dagName));
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(jobConf);
Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>(identifier,
jobTokenSecretManager);
shuffleToken.setService(identifier.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public Thread newThread(Runnable r) {
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
secretManager = new JobTokenSecretManager(conf);
recoverState(conf);
ServerBootstrap bootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
Expand Down Expand Up @@ -217,7 +218,8 @@ class MockShuffleHandlerWithFatalDiskError extends org.apache.tez.auxservices.Sh
"Could not find application_1234/240/output/attempt_1234_0/file.out.index";

private JobTokenSecretManager secretManager =
new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(getSecret().getBytes()));
new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(getSecret().getBytes()),
new TezConfiguration());

protected JobTokenSecretManager getSecretManager(){
return secretManager;
Expand Down Expand Up @@ -1209,7 +1211,7 @@ private static int getShuffleResponseCode(ShuffleHandler shuffle,
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
String encHash = SecureShuffleUtils.hashFromString(
SecureShuffleUtils.buildMsgFrom(url),
new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(jt.getPassword())));
new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(jt.getPassword()), new TezConfiguration()));
conn.addRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu
SecretKey shuffleSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret, conf);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public ShuffleScheduler(InputContext inputContext,
SecretKey jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret, conf);

final ExecutorService fetcherRawExecutor;
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private ShuffleManager createShuffleManager(InputContext inputContext) throws IO

DataOutputBuffer out = new DataOutputBuffer();
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
new JobTokenSecretManager(null));
new JobTokenSecretManager(new TezConfiguration()));
token.write(out);
doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private ShuffleManagerForTest createShuffleManager(

DataOutputBuffer out = new DataOutputBuffer();
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
new JobTokenSecretManager(null));
new JobTokenSecretManager(new TezConfiguration()));
token.write(out);
doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).
getServiceConsumerMetaData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private InputContext createTezInputContext() throws IOException {
doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
Token<JobTokenIdentifier>
sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
new JobTokenSecretManager());
new JobTokenSecretManager(new TezConfiguration()));
ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
Expand Down Expand Up @@ -87,7 +88,7 @@ private InputContext createTezInputContext() throws IOException {
ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
new JobTokenSecretManager());
new JobTokenSecretManager(new TezConfiguration()));
ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ private InputContext createTezInputContext() throws IOException {
doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
Token<JobTokenIdentifier>
sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
new JobTokenSecretManager());
new JobTokenSecretManager(new TezConfiguration()));
ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
Expand Down