Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,8 @@ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTranspo
try {
int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
int connectionTimeout = (int) MetastoreConf.getTimeVar(conf,
ConfVars.CLIENT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
if (useSSL) {
String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
if (trustStorePath.isEmpty()) {
Expand All @@ -751,10 +753,10 @@ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTranspo
String trustStoreAlgorithm =
MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
binaryTransport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
connectionTimeout, trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
} else {
binaryTransport = new TSocket(new TConfiguration(), store.getHost(), store.getPort(),
clientSocketTimeout);
clientSocketTimeout, connectionTimeout);
}
binaryTransport = createAuthBinaryTransport(store, binaryTransport);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ public enum ConfVars {
"has an infinite lifetime."),
CLIENT_SOCKET_TIMEOUT("metastore.client.socket.timeout", "hive.metastore.client.socket.timeout", 600,
TimeUnit.SECONDS, "MetaStore Client socket timeout in seconds"),
CLIENT_CONNECTION_TIMEOUT("metastore.client.connection.timeout", "hive.metastore.client.connection.timeout", 600,
TimeUnit.SECONDS, "MetaStore Client connection timeout in seconds"),
COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE("metastore.compactor.history.retention.did.not.initiate",
"hive.compactor.history.retention.did.not.initiate", 2,
new RangeValidator(0, 100), "Determines how many compaction records in state " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str
return thriftServerSocket;
}

public static TTransport getSSLSocket(String host, int port, int loginTimeout,
public static TTransport getSSLSocket(String host, int port, int socketTimeout, int connectionTimeout,
String trustStorePath, String trustStorePassWord, String trustStoreType,
String trustStoreAlgorithm) throws TTransportException {
TSSLTransportFactory.TSSLTransportParameters params =
Expand All @@ -282,8 +282,9 @@ public static TTransport getSSLSocket(String host, int port, int loginTimeout,
tStoreAlgorithm, tStoreType);
params.requireClientAuth(true);
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT and
// SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params);
// connection timeout and SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
tSSLSocket.setConnectTimeout(connectionTimeout);
return getSSLSocketWithHttps(tSSLSocket);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ private void open() throws MetaException {
boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
int connectionTimeout = (int) MetastoreConf.getTimeVar(conf,
ConfVars.CLIENT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);

for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
for (URI store : metastoreUris) {
Expand All @@ -466,7 +468,7 @@ private void open() throws MetaException {

// Create an SSL socket and connect
transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm );
connectionTimeout, trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet());
} catch(IOException e) {
throw new IllegalArgumentException(e);
Expand All @@ -476,7 +478,8 @@ private void open() throws MetaException {
}
} else {
try {
transport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), clientSocketTimeout);
transport = new TSocket(new TConfiguration(),
store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout);
} catch (TTransportException e) {
tte = e;
throw new MetaException(e.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hive.metastore;

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -27,9 +29,11 @@
import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -43,9 +47,10 @@ public class TestHiveMetaStoreTimeout {
protected static HiveMetaStoreClient client;
protected static Configuration conf;
protected static Warehouse warehouse;
protected static int port;

@BeforeClass
public static void setUp() throws Exception {
public static void startMetaStoreServer() throws Exception {
HMSHandler.testTimeoutEnabled = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This test code hacks HMSHandler class, I will refactor this in a new PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to repeat the whole setup for every test or it's possible to extract just the HMSHandler part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed! Have separated the start of MetaStore Server into a BeforeClass method.

conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setClass(conf, ConfVars.EXPRESSION_PROXY_CLASS,
Expand All @@ -54,19 +59,25 @@ public static void setUp() throws Exception {
TimeUnit.MILLISECONDS);
MetaStoreTestUtils.setConfForStandloneMode(conf);
warehouse = new Warehouse(conf);
client = new HiveMetaStoreClient(conf);
port = MetaStoreTestUtils.startMetaStoreWithRetry(conf);
MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port);
MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);
}

@AfterClass
public static void tearDown() throws Exception {
public static void tearDown() {
HMSHandler.testTimeoutEnabled = false;
try {
client.close();
} catch (Throwable e) {
System.err.println("Unable to close metastore");
System.err.println(StringUtils.stringifyException(e));
throw e;
}
}

@Before
public void setup() throws MetaException {
client = new HiveMetaStoreClient(conf);
}

@After
public void cleanup() {
client.close();
client = null;
}

@Test
Expand Down Expand Up @@ -96,9 +107,8 @@ public void testTimeout() throws Exception {
try {
client.createDatabase(db);
Assert.fail("should throw timeout exception.");
} catch (MetaException e) {
Assert.assertTrue("unexpected MetaException", e.getMessage().contains("Timeout when " +
"executing method: create_database"));
} catch (TTransportException e) {
Assert.assertTrue("unexpected Exception", e.getMessage().contains("Read timed out"));
}

// restore
Expand All @@ -117,7 +127,7 @@ public void testResetTimeout() throws Exception {
.build(conf);
try {
client.createDatabase(db);
} catch (MetaException e) {
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have changed to use the remote MetaStore server in this test class, so the timeout exception was wrapped in TTransportException rather than MetaException.

Assert.fail("should not throw timeout exception: " + e.getMessage());
}
client.dropDatabase(dbName, true, true);
Expand All @@ -130,13 +140,28 @@ public void testResetTimeout() throws Exception {
try {
client.createDatabase(db);
Assert.fail("should throw timeout exception.");
} catch (MetaException e) {
Assert.assertTrue("unexpected MetaException", e.getMessage().contains("Timeout when " +
"executing method: create_database"));
} catch (TTransportException e) {
Assert.assertTrue("unexpected Exception", e.getMessage().contains("Read timed out"));
}
}

// restore
client.dropDatabase(dbName, true, true);
client.setMetaConf(ConfVars.CLIENT_SOCKET_TIMEOUT.getVarname(), "10s");
@Test
public void testConnectionTimeout() throws Exception {
Configuration newConf = new Configuration(conf);
MetastoreConf.setTimeVar(newConf, ConfVars.CLIENT_CONNECTION_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
// fake host to mock connection time out
MetastoreConf.setVar(newConf, ConfVars.THRIFT_URIS, "thrift://1.1.1.1:" + port);
MetastoreConf.setLongVar(newConf, ConfVars.THRIFT_CONNECTION_RETRIES, 1);

Future<Void> future = Executors.newSingleThreadExecutor().submit(() -> {
try(HiveMetaStoreClient c = new HiveMetaStoreClient(newConf)) {
Assert.fail("should throw connection timeout exception.");
} catch (MetaException e) {
Assert.assertTrue("unexpected Exception", e.getMessage().contains("connect timed out"));
}
return null;
});
future.get(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ private TTransport open(Configuration conf, @NotNull URI uri) throws
boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
int connectionTimeout = (int) MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.CLIENT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);

LOG.debug("Connecting to {}, framedTransport = {}", uri, useFramedTransport);

Expand All @@ -406,7 +408,7 @@ private TTransport open(Configuration conf, @NotNull URI uri) throws

// Sasl/SSL code is copied from HiveMetastoreCLient
if (!useSSL) {
transport = new TSocket(new TConfiguration(),host, port, clientSocketTimeout);
transport = new TSocket(new TConfiguration(),host, port, clientSocketTimeout, connectionTimeout);
} else {
String trustStorePath = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PATH).trim();
if (trustStorePath.isEmpty()) {
Expand All @@ -421,7 +423,7 @@ private TTransport open(Configuration conf, @NotNull URI uri) throws
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();

// Create an SSL socket and connect
transport = SecurityUtils.getSSLSocket(host, port, clientSocketTimeout,
transport = SecurityUtils.getSSLSocket(host, port, clientSocketTimeout, connectionTimeout,
trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
LOG.info("Opened an SSL connection to metastore, current connections");
}
Expand Down