Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 126 additions & 21 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TiSession implements AutoCloseable {
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
private boolean isClosed = false;
private volatile boolean isClosed = false;
private MetricsServer metricsServer;
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;

Expand Down Expand Up @@ -106,22 +106,30 @@ public static TiSession getInstance(TiConfiguration conf) {
}

public RawKVClient createRawClient() {
checkIsClosed();

RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new RawKVClient(this, builder);
}

public KVClient createKVClient() {
checkIsClosed();

RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new KVClient(conf, builder);
}

public TxnKVClient createTxnClient() {
checkIsClosed();

return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
}

public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
checkIsClosed();

RegionStoreClient.RegionStoreClientBuilder res = clientBuilder;
if (res == null) {
synchronized (this) {
Expand All @@ -137,6 +145,8 @@ public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder()
}

public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
checkIsClosed();

ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
if (res == null) {
synchronized (this) {
Expand All @@ -156,18 +166,26 @@ public TiConfiguration getConf() {
}

public TiTimestamp getTimestamp() {
checkIsClosed();

return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
}

public Snapshot createSnapshot() {
checkIsClosed();

return new Snapshot(getTimestamp(), this);
}

public Snapshot createSnapshot(TiTimestamp ts) {
checkIsClosed();

return new Snapshot(ts, this);
}

public PDClient getPDClient() {
checkIsClosed();

PDClient res = client;
if (res == null) {
synchronized (this) {
Expand All @@ -181,6 +199,8 @@ public PDClient getPDClient() {
}

public Catalog getCatalog() {
checkIsClosed();

Catalog res = catalog;
if (res == null) {
synchronized (this) {
Expand All @@ -194,6 +214,8 @@ public Catalog getCatalog() {
}

public RegionManager getRegionManager() {
checkIsClosed();

RegionManager res = regionManager;
if (res == null) {
synchronized (this) {
Expand All @@ -207,6 +229,8 @@ public RegionManager getRegionManager() {
}

public ExecutorService getThreadPoolForIndexScan() {
checkIsClosed();

ExecutorService res = indexScanThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -226,6 +250,8 @@ public ExecutorService getThreadPoolForIndexScan() {
}

public ExecutorService getThreadPoolForTableScan() {
checkIsClosed();

ExecutorService res = tableScanThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -242,6 +268,8 @@ public ExecutorService getThreadPoolForTableScan() {
}

public ExecutorService getThreadPoolForBatchPut() {
checkIsClosed();

ExecutorService res = batchPutThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -261,6 +289,8 @@ public ExecutorService getThreadPoolForBatchPut() {
}

public ExecutorService getThreadPoolForBatchGet() {
checkIsClosed();

ExecutorService res = batchGetThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -280,6 +310,8 @@ public ExecutorService getThreadPoolForBatchGet() {
}

public ExecutorService getThreadPoolForBatchDelete() {
checkIsClosed();

ExecutorService res = batchDeleteThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -299,6 +331,8 @@ public ExecutorService getThreadPoolForBatchDelete() {
}

public ExecutorService getThreadPoolForBatchScan() {
checkIsClosed();

ExecutorService res = batchScanThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -318,6 +352,8 @@ public ExecutorService getThreadPoolForBatchScan() {
}

public ExecutorService getThreadPoolForDeleteRange() {
checkIsClosed();

ExecutorService res = deleteRangeThreadPool;
if (res == null) {
synchronized (this) {
Expand All @@ -338,6 +374,8 @@ public ExecutorService getThreadPoolForDeleteRange() {

@VisibleForTesting
public ChannelFactory getChannelFactory() {
checkIsClosed();

return channelFactory;
}

Expand All @@ -347,6 +385,8 @@ public ChannelFactory getChannelFactory() {
* @return a SwitchTiKVModeClient
*/
public SwitchTiKVModeClient getSwitchTiKVModeClient() {
checkIsClosed();

return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
}

Expand All @@ -363,6 +403,8 @@ public void splitRegionAndScatter(
int splitRegionBackoffMS,
int scatterRegionBackoffMS,
int scatterWaitMS) {
checkIsClosed();

logger.info(String.format("split key's size is %d", splitKeys.size()));
long startMS = System.currentTimeMillis();

Expand Down Expand Up @@ -412,6 +454,8 @@ public void splitRegionAndScatter(
* @param splitKeys
*/
public void splitRegionAndScatter(List<byte[]> splitKeys) {
checkIsClosed();

int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
Expand Down Expand Up @@ -475,50 +519,111 @@ private List<Metapb.Region> splitRegion(
return regions;
}

@Override
public synchronized void close() throws Exception {
private void checkIsClosed() {
if (isClosed) {
logger.warn("this TiSession is already closed!");
return;
throw new RuntimeException("this TiSession is closed!");
}
}

public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
shutdown(false);

long startMS = System.currentTimeMillis();
while (true) {
if (isTerminatedExecutorServices()) {
cleanAfterTerminated();
return;
}

if (System.currentTimeMillis() - startMS > timeoutMS) {
shutdown(true);
return;
}
Thread.sleep(500);
}
}

@Override
public synchronized void close() throws Exception {
shutdown(true);
}

if (metricsServer != null) {
metricsServer.close();
private synchronized void shutdown(boolean now) throws Exception {
if (!isClosed) {
isClosed = true;
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
}

if (metricsServer != null) {
metricsServer.close();
}
}

isClosed = true;
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
if (now) {
shutdownNowExecutorServices();
cleanAfterTerminated();
} else {
shutdownExecutorServices();
}
}

private synchronized void cleanAfterTerminated() throws InterruptedException {
if (regionManager != null) {
regionManager.close();
}
if (client != null) {
client.close();
}
if (catalog != null) {
catalog.close();
}
}

private List<ExecutorService> getExecutorServices() {
List<ExecutorService> executorServiceList = new ArrayList<>();
if (tableScanThreadPool != null) {
tableScanThreadPool.shutdownNow();
executorServiceList.add(tableScanThreadPool);
}
if (indexScanThreadPool != null) {
indexScanThreadPool.shutdownNow();
executorServiceList.add(indexScanThreadPool);
}
if (batchGetThreadPool != null) {
batchGetThreadPool.shutdownNow();
executorServiceList.add(batchGetThreadPool);
}
if (batchPutThreadPool != null) {
batchPutThreadPool.shutdownNow();
executorServiceList.add(batchPutThreadPool);
}
if (batchDeleteThreadPool != null) {
batchDeleteThreadPool.shutdownNow();
executorServiceList.add(batchDeleteThreadPool);
}
if (batchScanThreadPool != null) {
batchScanThreadPool.shutdownNow();
executorServiceList.add(batchScanThreadPool);
}
if (deleteRangeThreadPool != null) {
deleteRangeThreadPool.shutdownNow();
executorServiceList.add(deleteRangeThreadPool);
}
if (client != null) {
getPDClient().close();
return executorServiceList;
}

private void shutdownExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
executorService.shutdown();
}
if (catalog != null) {
getCatalog().close();
}

private void shutdownNowExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
executorService.shutdownNow();
}
}

private boolean isTerminatedExecutorServices() {
for (ExecutorService executorService : getExecutorServices()) {
if (!executorService.isTerminated()) {
return false;
}
}
return true;
}
}
Loading