Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -75,11 +76,13 @@ public class MysqlChannel {

protected volatile MysqlSerializer serializer;

private ConnectContext context;

protected MysqlChannel() {
// For DummyMysqlChannel
}

public MysqlChannel(StreamConnection connection) {
public MysqlChannel(StreamConnection connection, ConnectContext context) {
Preconditions.checkNotNull(connection);
this.sequenceId = 0;
this.isSend = false;
Expand All @@ -100,6 +103,7 @@ public MysqlChannel(StreamConnection connection) {
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
this.context = context;
}

public void initSslBuffer() {
Expand Down Expand Up @@ -182,7 +186,8 @@ protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException {
}
try {
while (dstBuf.remaining() != 0) {
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(),
TimeUnit.SECONDS);
// return -1 when remote peer close the channel
if (ret == -1) {
decryptData(dstBuf, isHeader);
Expand Down Expand Up @@ -352,12 +357,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
TimeUnit.SECONDS);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
}
Channels.flushBlocking(conn.getSinkChannel());
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public ConnectContext(StreamConnection connection) {
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
isKilled = false;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection);
mysqlChannel = new MysqlChannel(connection, this);
} else {
mysqlChannel = new DummyMysqlChannel();
}
Expand Down Expand Up @@ -672,5 +672,12 @@ public String getQueryIdentifier() {
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
}

public int getNetReadTimeout() {
return this.sessionVariable.getNetReadTimeout();
}

public int getNetWriteTimeout() {
return this.sessionVariable.getNetWriteTimeout();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,11 @@ public class SessionVariable implements Serializable, Writable {

// The number of seconds to wait for a block to be written to a connection before aborting the write
@VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT)
public int netWriteTimeout = 60;
public int netWriteTimeout = 600;

// The number of seconds to wait for a block to be written to a connection before aborting the write
@VariableMgr.VarAttr(name = NET_READ_TIMEOUT)
public int netReadTimeout = 60;
public int netReadTimeout = 600;

// The current time zone
@VariableMgr.VarAttr(name = TIME_ZONE, needForward = true)
Expand Down