Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class MysqlChannel {
// mysql flag CLIENT_DEPRECATE_EOF
private boolean clientDeprecatedEOF;

private ConnectContext context;

protected MysqlChannel() {
// For DummyMysqlChannel
}
Expand All @@ -91,7 +94,7 @@ public boolean clientDeprecatedEOF() {
return clientDeprecatedEOF;
}

public MysqlChannel(StreamConnection connection) {
public MysqlChannel(StreamConnection connection, ConnectContext context) {
Preconditions.checkNotNull(connection);
this.sequenceId = 0;
this.isSend = false;
Expand All @@ -113,6 +116,7 @@ public MysqlChannel(StreamConnection connection) {
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
this.context = context;
}

public void initSslBuffer() {
Expand Down Expand Up @@ -195,7 +199,8 @@ protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException {
}
try {
while (dstBuf.remaining() != 0) {
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(),
TimeUnit.SECONDS);
// return -1 when remote peer close the channel
if (ret == -1) {
decryptData(dstBuf, isHeader);
Expand Down Expand Up @@ -365,12 +370,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
TimeUnit.SECONDS);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
}
Channels.flushBlocking(conn.getSinkChannel());
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public ConnectContext(StreamConnection connection) {
connectType = ConnectType.MYSQL;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection);
mysqlChannel = new MysqlChannel(connection, this);
} else {
mysqlChannel = new DummyMysqlChannel();
}
Expand Down Expand Up @@ -1033,5 +1033,13 @@ public boolean isSkipAuth() {
public void setSkipAuth(boolean skipAuth) {
this.skipAuth = skipAuth;
}

public int getNetReadTimeout() {
return this.sessionVariable.getNetReadTimeout();
}

public int getNetWriteTimeout() {
return this.sessionVariable.getNetWriteTimeout();
}
}