From cd0d62333d704c88b96744c2adc1c4e094971f33 Mon Sep 17 00:00:00 2001 From: fornaix Date: Thu, 21 Dec 2023 17:22:34 +0800 Subject: [PATCH] [fix](mysql) fix mysql channel infinite blocking --- .../java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++---- .../java/org/apache/doris/qe/ConnectContext.java | 10 +++++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index 5eaee47fa4b377..8e7c5f79ffd0ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; @@ -79,6 +80,8 @@ public class MysqlChannel { // mysql flag CLIENT_DEPRECATE_EOF private boolean clientDeprecatedEOF; + private ConnectContext context; + protected MysqlChannel() { // For DummyMysqlChannel } @@ -91,7 +94,7 @@ public boolean clientDeprecatedEOF() { return clientDeprecatedEOF; } - public MysqlChannel(StreamConnection connection) { + public MysqlChannel(StreamConnection connection, ConnectContext context) { Preconditions.checkNotNull(connection); this.sequenceId = 0; this.isSend = false; @@ -113,6 +116,7 @@ public MysqlChannel(StreamConnection connection) { this.defaultBuffer = ByteBuffer.allocate(16 * 1024); this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN); this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024); + this.context = context; } public void initSslBuffer() { @@ -195,7 +199,8 @@ protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException { } try { while (dstBuf.remaining() != 0) { - int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf); + int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(), + TimeUnit.SECONDS); // return -1 when remote peer close the channel if (ret == -1) { decryptData(dstBuf, isHeader); @@ -365,12 +370,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) { protected void realNetSend(ByteBuffer buffer) throws IOException { buffer = encryptData(buffer); long bufLen = buffer.remaining(); - long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer); + long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(), + TimeUnit.SECONDS); if (bufLen != writeLen) { throw new IOException("Write mysql packet failed.[write=" + writeLen + ", needToWrite=" + bufLen + "]"); } - Channels.flushBlocking(conn.getSinkChannel()); + Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS); isSend = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index a3590a23c4bee2..8a1a16999b6a14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -327,7 +327,7 @@ public ConnectContext(StreamConnection connection) { connectType = ConnectType.MYSQL; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; if (connection != null) { - mysqlChannel = new MysqlChannel(connection); + mysqlChannel = new MysqlChannel(connection, this); } else { mysqlChannel = new DummyMysqlChannel(); } @@ -1033,5 +1033,13 @@ public boolean isSkipAuth() { public void setSkipAuth(boolean skipAuth) { this.skipAuth = skipAuth; } + + public int getNetReadTimeout() { + return this.sessionVariable.getNetReadTimeout(); + } + + public int getNetWriteTimeout() { + return this.sessionVariable.getNetWriteTimeout(); + } }