From f1f7740863c5a8215b4dd2c714c1dc700448dfdc Mon Sep 17 00:00:00 2001 From: fornaix Date: Fri, 29 Dec 2023 13:57:22 +0800 Subject: [PATCH 1/2] [fix](mysql) fix mysql channel infinite blocking (#28808) Call the Channels blocking method with timeout instead. Using session variables net_write_timeout and net_read_timeout as the timeout parameter. --- .../java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++---- .../java/org/apache/doris/qe/ConnectContext.java | 9 ++++++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index 5172b243bc2d98..25d23af70e211e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; @@ -75,11 +76,13 @@ public class MysqlChannel { protected volatile MysqlSerializer serializer; + private ConnectContext context; + protected MysqlChannel() { // For DummyMysqlChannel } - public MysqlChannel(StreamConnection connection) { + public MysqlChannel(StreamConnection connection, ConnectContext context) { Preconditions.checkNotNull(connection); this.sequenceId = 0; this.isSend = false; @@ -100,6 +103,7 @@ public MysqlChannel(StreamConnection connection) { this.defaultBuffer = ByteBuffer.allocate(16 * 1024); this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN); this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024); + this.context = context; } public void initSslBuffer() { @@ -182,7 +186,8 @@ protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException { } try { while (dstBuf.remaining() != 0) { - int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf); + int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(), + TimeUnit.SECONDS); // return -1 when remote peer close the channel if (ret == -1) { decryptData(dstBuf, isHeader); @@ -352,12 +357,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) { protected void realNetSend(ByteBuffer buffer) throws IOException { buffer = encryptData(buffer); long bufLen = buffer.remaining(); - long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer); + long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(), + TimeUnit.SECONDS); if (bufLen != writeLen) { throw new IOException("Write mysql packet failed.[write=" + writeLen + ", needToWrite=" + bufLen + "]"); } - Channels.flushBlocking(conn.getSinkChannel()); + Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS); isSend = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index d3994c04d9b753..4ebf493bdd1dab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -215,7 +215,7 @@ public ConnectContext(StreamConnection connection) { serverCapability = MysqlCapability.DEFAULT_CAPABILITY; isKilled = false; if (connection != null) { - mysqlChannel = new MysqlChannel(connection); + mysqlChannel = new MysqlChannel(connection, this); } else { mysqlChannel = new DummyMysqlChannel(); } @@ -672,5 +672,12 @@ public String getQueryIdentifier() { return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]"; } + public int getNetReadTimeout() { + return this.sessionVariable.getNetReadTimeout(); + } + + public int getNetWriteTimeout() { + return this.sessionVariable.getNetWriteTimeout(); + } } From b7f1f644fb5df0c2f75e0848f03d486e2f0b4dce Mon Sep 17 00:00:00 2001 From: camby Date: Sat, 23 Mar 2024 06:55:57 +0800 Subject: [PATCH 2/2] [conf](mysql) opt mysql network timeout to 600s #32545 --- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2a9a784173f4af..0b8ac0ccea749e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -384,11 +384,11 @@ public class SessionVariable implements Serializable, Writable { // The number of seconds to wait for a block to be written to a connection before aborting the write @VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT) - public int netWriteTimeout = 60; + public int netWriteTimeout = 600; // The number of seconds to wait for a block to be written to a connection before aborting the write @VariableMgr.VarAttr(name = NET_READ_TIMEOUT) - public int netReadTimeout = 60; + public int netReadTimeout = 600; // The current time zone @VariableMgr.VarAttr(name = TIME_ZONE, needForward = true)