Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hbase-mapreduce/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public static void initCredentialsForCluster(Job job, String quorumAddress)
*/
public static void initCredentialsForCluster(Job job, Configuration conf)
throws IOException {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
UserProvider userProvider = UserProvider.instantiate(conf);
if (userProvider.isHBaseSecurityEnabled()) {
try {
Connection peerConn = ConnectionFactory.createConnection(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,39 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.File;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.access.AccessController;
import org.apache.hadoop.hbase.security.access.PermissionStorage;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.TokenProvider;
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -38,11 +59,17 @@
*/
@Category({MapReduceTests.class, MediumTests.class})
public class TestTableMapReduceUtil {
private static final String HTTP_PRINCIPAL = "HTTP/localhost";

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);

@After
public void after() {
SaslClientAuthenticationProviders.reset();
}

/*
* initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
* the method depends on an online cluster.
Expand All @@ -51,10 +78,11 @@ public class TestTableMapReduceUtil {
@Test
public void testInitTableMapperJob1() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
Job job = Job.getInstance(configuration, "tableName");
// test
TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
Text.class, job, false, WALInputFormat.class);
TableMapReduceUtil.initTableMapperJob(
"Table", new Scan(), Import.Importer.class, Text.class, Text.class, job,
false, WALInputFormat.class);
assertEquals(WALInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
Expand All @@ -66,9 +94,10 @@ public void testInitTableMapperJob1() throws Exception {
@Test
public void testInitTableMapperJob2() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
Job job = Job.getInstance(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
Text.class, job, false, WALInputFormat.class);
assertEquals(WALInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
Expand All @@ -80,9 +109,10 @@ public void testInitTableMapperJob2() throws Exception {
@Test
public void testInitTableMapperJob3() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job);
Job job = Job.getInstance(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
Text.class, job);
assertEquals(TableInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
Expand All @@ -94,14 +124,166 @@ public void testInitTableMapperJob3() throws Exception {
@Test
public void testInitTableMapperJob4() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job, false);
Job job = Job.getInstance(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
Text.class, job, false);
assertEquals(TableInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
}

private static Closeable startSecureMiniCluster(
HBaseTestingUtil util, MiniKdc kdc, String principal) throws Exception {
Configuration conf = util.getConfiguration();

SecureTestUtil.enableSecurity(conf);
VisibilityTestUtil.enableVisiblityLabels(conf);
SecureTestUtil.verifyConfiguration(conf);

conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
AccessController.class.getName() + ',' + TokenProvider.class.getName());

HBaseKerberosUtils.setSecuredConfiguration(conf,
principal + '@' + kdc.getRealm(), HTTP_PRINCIPAL + '@' + kdc.getRealm());

util.startMiniCluster();
try {
util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
} catch (Exception e) {
util.shutdownMiniCluster();
throw e;
}

return util::shutdownMiniCluster;
}

@Test public void testInitCredentialsForCluster1() throws Exception {
HBaseTestingUtil util1 = new HBaseTestingUtil();
HBaseTestingUtil util2 = new HBaseTestingUtil();

util1.startMiniCluster();
try {
util2.startMiniCluster();
try {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);

TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());

Credentials credentials = job.getCredentials();
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
assertTrue(tokens.isEmpty());
} finally {
util2.shutdownMiniCluster();
}
} finally {
util1.shutdownMiniCluster();
}
}

@Test @SuppressWarnings("unchecked") public void testInitCredentialsForCluster2()
throws Exception {
HBaseTestingUtil util1 = new HBaseTestingUtil();
HBaseTestingUtil util2 = new HBaseTestingUtil();

File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
MiniKdc kdc = util1.setupMiniKdc(keytab);
try {
String username = UserGroupInformation.getLoginUser().getShortUserName();
String userPrincipal = username + "/localhost";
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());

try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal);
Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);

TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());

Credentials credentials = job.getCredentials();
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
assertEquals(1, tokens.size());

String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
Token<AuthenticationTokenIdentifier> tokenForCluster =
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
assertEquals(userPrincipal + '@' + kdc.getRealm(),
tokenForCluster.decodeIdentifier().getUsername());
}
} finally {
kdc.stop();
}
}

@Test public void testInitCredentialsForCluster3() throws Exception {
HBaseTestingUtil util1 = new HBaseTestingUtil();

File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
MiniKdc kdc = util1.setupMiniKdc(keytab);
try {
String username = UserGroupInformation.getLoginUser().getShortUserName();
String userPrincipal = username + "/localhost";
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());

try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) {
HBaseTestingUtil util2 = new HBaseTestingUtil();
// Assume util2 is insecure cluster
// Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
// once

Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);

TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());

Credentials credentials = job.getCredentials();
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
assertTrue(tokens.isEmpty());
}
} finally {
kdc.stop();
}
}

@Test @SuppressWarnings("unchecked") public void testInitCredentialsForCluster4()
throws Exception {
HBaseTestingUtil util1 = new HBaseTestingUtil();
// Assume util1 is insecure cluster
// Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once

HBaseTestingUtil util2 = new HBaseTestingUtil();
File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
MiniKdc kdc = util2.setupMiniKdc(keytab);
try {
String username = UserGroupInformation.getLoginUser().getShortUserName();
String userPrincipal = username + "/localhost";
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());

try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);

TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());

Credentials credentials = job.getCredentials();
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
assertEquals(1, tokens.size());

String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
Token<AuthenticationTokenIdentifier> tokenForCluster =
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
assertEquals(userPrincipal + '@' + kdc.getRealm(),
tokenForCluster.decodeIdentifier().getUsername());
}
} finally {
kdc.stop();
}
}
}