Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ public String getSqlDialect() {
return getString(ConsumerConstant.SQL_DIALECT_KEY);
}

public long getHeartbeatIntervalMs() {
return getLongOrDefault(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE);
}

public int getConnectionTimeoutInMs() {
return getIntOrDefault(
ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE);
}

public void setConsumerId(final String consumerId) {
attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class ConsumerConstant {

public static final String THRIFT_MAX_FRAME_SIZE_KEY = "thrift-max-frame-size";

public static final String CONNECTION_TIMEOUT_MS_KEY = "connection-timeout-ms";
public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0;

public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism";
public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
public ZoneId zoneId = null;
public int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
public int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
public int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;

Check warning on line 44 in iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make connectionTimeoutInMs a static final constant or non-public and provide accessors if needed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzihq6-NL55AYaoznVR&open=AZzihq6-NL55AYaoznVR&pullRequest=17293
// this field only take effect in write request, nothing to do with any other type requests,
// like query, load and so on.
// if set to true, it means that we may redirect the write request to its corresponding leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ public Session(AbstractSessionBuilder builder) {
this.zoneId = builder.zoneId;
this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
this.version = builder.version;
this.useSSL = builder.useSSL;
this.trustStore = builder.trustStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SubscriptionSessionWrapper(final AbstractSessionBuilder builder) {
}

public void open() throws IoTDBConnectionException {
super.open();
super.open(enableThriftRpcCompaction, connectionTimeoutInMs);
}

public void close() throws IoTDBConnectionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public SubscriptionTableSessionBuilder thriftMaxFrameSize(final int thriftMaxFra
return this;
}

public SubscriptionTableSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) {
super.connectionTimeoutInMs = connectionTimeoutInMs;
return this;
}

public ISubscriptionTableSession build() throws IoTDBConnectionException {
final ISubscriptionTableSession session = new SubscriptionTableSession(this);
session.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public SubscriptionTreeSessionBuilder thriftMaxFrameSize(final int thriftMaxFram
return this;
}

public SubscriptionTreeSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) {
super.connectionTimeoutInMs = connectionTimeoutInMs;
return this;
}

public ISubscriptionTreeSession build() {
return new SubscriptionTreeSession(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet = new HashSet<>();

private final int thriftMaxFrameSize;
private final int connectionTimeoutInMs;
private final int maxPollParallelism;

@SuppressWarnings("java:S3077")
Expand Down Expand Up @@ -187,6 +188,7 @@
this.fileSaveFsync = builder.fileSaveFsync;

this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
this.maxPollParallelism = builder.maxPollParallelism;
}

Expand Down Expand Up @@ -232,6 +234,11 @@
properties.getOrDefault(
ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE))
.connectionTimeoutInMs(
(Integer)
properties.getOrDefault(
ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS))
.maxPollParallelism(
(Integer)
properties.getOrDefault(
Expand Down Expand Up @@ -376,13 +383,15 @@

/////////////////////////////// subscription provider ///////////////////////////////

protected abstract AbstractSubscriptionProvider constructSubscriptionProvider(

Check warning on line 386 in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzihq1bNL55AYaoznVN&open=AZzihq1bNL55AYaoznVN&pullRequest=17293
final TEndPoint endPoint,
final String username,
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize);
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs);

AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
throws SubscriptionException {
Expand All @@ -393,7 +402,9 @@
this.password,
this.consumerId,
this.consumerGroupId,
this.thriftMaxFrameSize);
this.thriftMaxFrameSize,
this.heartbeatIntervalMs,
this.connectionTimeoutInMs);
try {
provider.handshake();
} catch (final Exception e) {
Expand Down Expand Up @@ -1428,6 +1439,7 @@
result.put("fileSaveFsync", String.valueOf(fileSaveFsync));
result.put("inFlightFilesCommitContextSet", inFlightFilesCommitContextSet.toString());
result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize));
result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs));
result.put("maxPollParallelism", String.valueOf(maxPollParallelism));
result.put("subscribedTopics", subscribedTopics.toString());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class AbstractSubscriptionConsumerBuilder {
protected boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;

protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
protected int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
protected int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;

public AbstractSubscriptionConsumerBuilder host(final String host) {
Expand Down Expand Up @@ -120,6 +121,12 @@ public AbstractSubscriptionConsumerBuilder thriftMaxFrameSize(final int thriftMa
return this;
}

public AbstractSubscriptionConsumerBuilder connectionTimeoutInMs(
final int connectionTimeoutInMs) {
this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
return this;
}

public AbstractSubscriptionConsumerBuilder maxPollParallelism(final int maxPollParallelism) {
// Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a
// single thread to execute poll whenever there are idle resources available, thereby
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,30 +90,42 @@

private final String username;
private final String password;
private final long heartbeatIntervalMs;
private final int connectionTimeoutInMs;

protected abstract AbstractSessionBuilder constructSubscriptionSessionBuilder(
final String host,
final int port,
final String username,
final String password,
final int thriftMaxFrameSize);
final int thriftMaxFrameSize,
final int connectionTimeoutInMs);

protected AbstractSubscriptionProvider(

Check warning on line 104 in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzihq3sNL55AYaoznVO&open=AZzihq3sNL55AYaoznVO&pullRequest=17293
final TEndPoint endPoint,
final String username,
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize) {
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
this.session =
new SubscriptionSessionWrapper(
constructSubscriptionSessionBuilder(
endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize));
endPoint.ip,
endPoint.port,
username,
password,
thriftMaxFrameSize,
connectionTimeoutInMs));
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
this.username = username;
this.password = password;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}

SubscriptionSessionConnection getSessionConnection() throws IoTDBConnectionException {
Expand Down Expand Up @@ -164,6 +176,10 @@
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, session.getSqlDialect());
consumerAttributes.put(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs));
consumerAttributes.put(
ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, String.valueOf(connectionTimeoutInMs));

final PipeSubscribeHandshakeResp resp =
handshake(new ConsumerConfig(consumerAttributes)); // throw SubscriptionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public AbstractSubscriptionPullConsumerBuilder thriftMaxFrameSize(final int thri
return this;
}

@Override
public AbstractSubscriptionPullConsumerBuilder connectionTimeoutInMs(
final int connectionTimeoutInMs) {
super.connectionTimeoutInMs(connectionTimeoutInMs);
return this;
}

@Override
public AbstractSubscriptionPullConsumerBuilder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ public AbstractSubscriptionPushConsumerBuilder thriftMaxFrameSize(final int thri
return this;
}

@Override
public AbstractSubscriptionPushConsumerBuilder connectionTimeoutInMs(
final int connectionTimeoutInMs) {
super.connectionTimeoutInMs(connectionTimeoutInMs);
return this;
}

@Override
public AbstractSubscriptionPushConsumerBuilder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,24 @@

final class SubscriptionTableProvider extends AbstractSubscriptionProvider {

SubscriptionTableProvider(

Check warning on line 29 in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzihq5TNL55AYaoznVQ&open=AZzihq5TNL55AYaoznVQ&pullRequest=17293
final TEndPoint endPoint,
final String username,
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize) {
super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize);
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
super(
endPoint,
username,
password,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
}

@Override
Expand All @@ -42,12 +52,14 @@
final int port,
final String username,
final String password,
final int thriftMaxFrameSize) {
final int thriftMaxFrameSize,
final int connectionTimeoutInMs) {
return new SubscriptionTableSessionBuilder()
.host(host)
.port(port)
.username(username)
.password(password)
.thriftMaxFrameSize(thriftMaxFrameSize);
.thriftMaxFrameSize(thriftMaxFrameSize)
.connectionTimeoutInMs(connectionTimeoutInMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize) {
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
return new SubscriptionTableProvider(
endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize);
endPoint,
username,
password,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
}

/////////////////////////////// ctor ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public SubscriptionTablePullConsumerBuilder thriftMaxFrameSize(final int thriftM
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder connectionTimeoutInMs(
final int connectionTimeoutInMs) {
super.connectionTimeoutInMs(connectionTimeoutInMs);
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize) {
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
return new SubscriptionTableProvider(
endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize);
endPoint,
username,
password,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
}

/////////////////////////////// ctor ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public SubscriptionTablePushConsumerBuilder thriftMaxFrameSize(final int thriftM
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder connectionTimeoutInMs(
final int connectionTimeoutInMs) {
super.connectionTimeoutInMs(connectionTimeoutInMs);
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,24 @@

final class SubscriptionTreeProvider extends AbstractSubscriptionProvider {

SubscriptionTreeProvider(

Check warning on line 29 in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzihq42NL55AYaoznVP&open=AZzihq42NL55AYaoznVP&pullRequest=17293
final TEndPoint endPoint,
final String username,
final String password,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize) {
super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize);
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
super(
endPoint,
username,
password,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
}

@Override
Expand All @@ -42,12 +52,14 @@
final int port,
final String username,
final String password,
final int thriftMaxFrameSize) {
final int thriftMaxFrameSize,
final int connectionTimeoutInMs) {
return new SubscriptionTreeSessionBuilder()
.host(host)
.port(port)
.username(username)
.password(password)
.thriftMaxFrameSize(thriftMaxFrameSize);
.thriftMaxFrameSize(thriftMaxFrameSize)
.connectionTimeoutInMs(connectionTimeoutInMs);
}
}
Loading
Loading