diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 100de1f112623..0b5c5a5547794 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -80,6 +80,18 @@ public String getSqlDialect() { return getString(ConsumerConstant.SQL_DIALECT_KEY); } + public long getHeartbeatIntervalMs() { + return getLongOrDefault( + ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, + ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE); + } + + public int getConnectionTimeoutInMs() { + return getIntOrDefault( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, + ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE); + } + public void setConsumerId(final String consumerId) { attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index dd0c583e40d66..a0e6f9ed22801 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -55,6 +55,9 @@ public class ConsumerConstant { public static final String THRIFT_MAX_FRAME_SIZE_KEY = "thrift-max-frame-size"; + public static final String CONNECTION_TIMEOUT_MS_KEY = "connection-timeout-ms"; + public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0; + public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism"; public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java index a8f18ce32bc73..f6132d8041f1b 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java @@ -41,6 +41,7 @@ public abstract class AbstractSessionBuilder { public ZoneId zoneId = null; public int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY; public int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + public int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; // this field only take effect in write request, nothing to do with any other type requests, // like query, load and so on. // if set to true, it means that we may redirect the write request to its corresponding leader diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 572e6792e7c42..6382529940731 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -470,6 +470,7 @@ public Session(AbstractSessionBuilder builder) { this.zoneId = builder.zoneId; this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize; this.thriftMaxFrameSize = builder.thriftMaxFrameSize; + this.connectionTimeoutInMs = builder.connectionTimeoutInMs; this.version = builder.version; this.useSSL = builder.useSSL; this.trustStore = builder.trustStore; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java index ec5a4d6cc99ee..58c0b4a957f83 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java @@ -36,7 +36,7 @@ public SubscriptionSessionWrapper(final AbstractSessionBuilder builder) { } public void open() throws IoTDBConnectionException { - super.open(); + super.open(enableThriftRpcCompaction, connectionTimeoutInMs); } public void close() throws IoTDBConnectionException { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java index 41e26161c04c6..c1efd16557d88 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java @@ -58,6 +58,11 @@ public SubscriptionTableSessionBuilder thriftMaxFrameSize(final int thriftMaxFra return this; } + public SubscriptionTableSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) { + super.connectionTimeoutInMs = connectionTimeoutInMs; + return this; + } + public ISubscriptionTableSession build() throws IoTDBConnectionException { final ISubscriptionTableSession session = new SubscriptionTableSession(this); session.open(); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java index dc21732ba971f..8a1fa67d8223d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java @@ -57,6 +57,11 @@ public SubscriptionTreeSessionBuilder thriftMaxFrameSize(final int thriftMaxFram return this; } + public SubscriptionTreeSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) { + super.connectionTimeoutInMs = connectionTimeoutInMs; + return this; + } + public ISubscriptionTreeSession build() { return new SubscriptionTreeSession(this); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index a12340e9d7662..e27f182549e38 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -118,6 +118,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { private final Set inFlightFilesCommitContextSet = new HashSet<>(); private final int thriftMaxFrameSize; + private final int connectionTimeoutInMs; private final int maxPollParallelism; @SuppressWarnings("java:S3077") @@ -187,6 +188,7 @@ protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder this.fileSaveFsync = builder.fileSaveFsync; this.thriftMaxFrameSize = builder.thriftMaxFrameSize; + this.connectionTimeoutInMs = builder.connectionTimeoutInMs; this.maxPollParallelism = builder.maxPollParallelism; } @@ -232,6 +234,11 @@ protected AbstractSubscriptionConsumer( properties.getOrDefault( ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY, SessionConfig.DEFAULT_MAX_FRAME_SIZE)) + .connectionTimeoutInMs( + (Integer) + properties.getOrDefault( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, + SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS)) .maxPollParallelism( (Integer) properties.getOrDefault( @@ -382,7 +389,9 @@ protected abstract AbstractSubscriptionProvider constructSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs); AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint) throws SubscriptionException { @@ -393,7 +402,9 @@ AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPo this.password, this.consumerId, this.consumerGroupId, - this.thriftMaxFrameSize); + this.thriftMaxFrameSize, + this.heartbeatIntervalMs, + this.connectionTimeoutInMs); try { provider.handshake(); } catch (final Exception e) { @@ -1428,6 +1439,7 @@ protected Map allReportMessage() { result.put("fileSaveFsync", String.valueOf(fileSaveFsync)); result.put("inFlightFilesCommitContextSet", inFlightFilesCommitContextSet.toString()); result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize)); + result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs)); result.put("maxPollParallelism", String.valueOf(maxPollParallelism)); result.put("subscribedTopics", subscribedTopics.toString()); return result; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 7f965069e73e2..81bfb6241c902 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -48,6 +48,7 @@ public class AbstractSubscriptionConsumerBuilder { protected boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + protected int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; protected int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; public AbstractSubscriptionConsumerBuilder host(final String host) { @@ -120,6 +121,12 @@ public AbstractSubscriptionConsumerBuilder thriftMaxFrameSize(final int thriftMa return this; } + public AbstractSubscriptionConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public AbstractSubscriptionConsumerBuilder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 9bf119c76c428..7f3582d195d6a 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -90,13 +90,16 @@ public abstract class AbstractSubscriptionProvider { private final String username; private final String password; + private final long heartbeatIntervalMs; + private final int connectionTimeoutInMs; protected abstract AbstractSessionBuilder constructSubscriptionSessionBuilder( final String host, final int port, final String username, final String password, - final int thriftMaxFrameSize); + final int thriftMaxFrameSize, + final int connectionTimeoutInMs); protected AbstractSubscriptionProvider( final TEndPoint endPoint, @@ -104,16 +107,25 @@ protected AbstractSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { this.session = new SubscriptionSessionWrapper( constructSubscriptionSessionBuilder( - endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize)); + endPoint.ip, + endPoint.port, + username, + password, + thriftMaxFrameSize, + connectionTimeoutInMs)); this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; this.username = username; this.password = password; + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.connectionTimeoutInMs = connectionTimeoutInMs; } SubscriptionSessionConnection getSessionConnection() throws IoTDBConnectionException { @@ -164,6 +176,10 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, session.getSqlDialect()); + consumerAttributes.put( + ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + consumerAttributes.put( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, String.valueOf(connectionTimeoutInMs)); final PipeSubscribeHandshakeResp resp = handshake(new ConsumerConfig(consumerAttributes)); // throw SubscriptionException diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java index 3d9561adab27f..7083a7dc4af60 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java @@ -102,6 +102,13 @@ public AbstractSubscriptionPullConsumerBuilder thriftMaxFrameSize(final int thri return this; } + @Override + public AbstractSubscriptionPullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public AbstractSubscriptionPullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java index 9dba1c3989713..f013b98dd1951 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java @@ -108,6 +108,13 @@ public AbstractSubscriptionPushConsumerBuilder thriftMaxFrameSize(final int thri return this; } + @Override + public AbstractSubscriptionPushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public AbstractSubscriptionPushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 40492876b887b..1b90866db9e37 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -32,8 +32,18 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { - super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { + super( + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } @Override @@ -42,12 +52,14 @@ protected AbstractSessionBuilder constructSubscriptionSessionBuilder( final int port, final String username, final String password, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final int connectionTimeoutInMs) { return new SubscriptionTableSessionBuilder() .host(host) .port(port) .username(username) .password(password) - .thriftMaxFrameSize(thriftMaxFrameSize); + .thriftMaxFrameSize(thriftMaxFrameSize) + .connectionTimeoutInMs(connectionTimeoutInMs); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 9e51f7438ff01..83dd39aebbf7d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -44,9 +44,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTableProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index b85b669876a88..6d8437ac95f03 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -99,6 +99,13 @@ public SubscriptionTablePullConsumerBuilder thriftMaxFrameSize(final int thriftM return this; } + @Override + public SubscriptionTablePullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index 4fc2c352af841..ac44e421dac57 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -40,9 +40,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTableProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index fcd62b235e2dc..c372c586db394 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -101,6 +101,13 @@ public SubscriptionTablePushConsumerBuilder thriftMaxFrameSize(final int thriftM return this; } + @Override + public SubscriptionTablePushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index 56b07667f2d2a..c79b64e8c846f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -32,8 +32,18 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { - super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { + super( + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } @Override @@ -42,12 +52,14 @@ protected AbstractSessionBuilder constructSubscriptionSessionBuilder( final int port, final String username, final String password, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final int connectionTimeoutInMs) { return new SubscriptionTreeSessionBuilder() .host(host) .port(port) .username(username) .password(password) - .thriftMaxFrameSize(thriftMaxFrameSize); + .thriftMaxFrameSize(thriftMaxFrameSize) + .connectionTimeoutInMs(connectionTimeoutInMs); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 713dd601e2d83..23050893f660d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -51,9 +51,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTreeProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// @@ -78,6 +87,7 @@ private SubscriptionTreePullConsumer(final SubscriptionTreePullConsumer.Builder .fileSaveDir(builder.fileSaveDir) .fileSaveFsync(builder.fileSaveFsync) .thriftMaxFrameSize(builder.thriftMaxFrameSize) + .connectionTimeoutInMs(builder.connectionTimeoutInMs) .maxPollParallelism(builder.maxPollParallelism) .autoCommit(builder.autoCommit) .autoCommitIntervalMs(builder.autoCommitIntervalMs)); @@ -233,6 +243,7 @@ public static class Builder { private boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; private int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE; @@ -306,6 +317,11 @@ public Builder thriftMaxFrameSize(final int thriftMaxFrameSize) { return this; } + public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public Builder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index f3d3b7afba82e..8623a49208776 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -99,6 +99,13 @@ public SubscriptionTreePullConsumerBuilder thriftMaxFrameSize(final int thriftMa return this; } + @Override + public SubscriptionTreePullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index cd5c548121e2d..d56e89d47c86d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -50,9 +50,18 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTreeProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// @@ -77,6 +86,7 @@ private SubscriptionTreePushConsumer(final Builder builder) { .fileSaveDir(builder.fileSaveDir) .fileSaveFsync(builder.fileSaveFsync) .thriftMaxFrameSize(builder.thriftMaxFrameSize) + .connectionTimeoutInMs(builder.connectionTimeoutInMs) .maxPollParallelism(builder.maxPollParallelism) .ackStrategy(builder.ackStrategy) .consumeListener(builder.consumeListener) @@ -187,6 +197,7 @@ public static class Builder { private boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; private int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; private AckStrategy ackStrategy = AckStrategy.defaultValue(); @@ -263,6 +274,11 @@ public Builder thriftMaxFrameSize(final int thriftMaxFrameSize) { return this; } + public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public Builder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index 44fde0ed4f0bb..dd0cb01763770 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -101,6 +101,13 @@ public SubscriptionTreePushConsumerBuilder thriftMaxFrameSize(final int thriftMa return this; } + @Override + public SubscriptionTreePushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java index 2e2f27b0a02d8..200728fe203e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.subscription.agent; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver; import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1; @@ -36,6 +38,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class SubscriptionReceiverAgent { @@ -54,10 +60,20 @@ public class SubscriptionReceiverAgent { PipeSubscribeResponseType.ACK.getType()); private final ThreadLocal receiverThreadLocal = new ThreadLocal<>(); + private final Set activeReceivers = ConcurrentHashMap.newKeySet(); + private final ScheduledExecutorService receiverTimeoutChecker = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + SubscriptionReceiverAgent.class.getSimpleName() + "-Timeout-Checker"); SubscriptionReceiverAgent() { RECEIVER_CONSTRUCTORS.put( PipeSubscribeRequestVersion.VERSION_1.getVersion(), SubscriptionReceiverV1::new); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + receiverTimeoutChecker, + this::checkReceiverTimeouts, + Math.max(1_000L, SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L), + Math.max(1_000L, SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L), + TimeUnit.MILLISECONDS); } public TPipeSubscribeResp handle(final TPipeSubscribeReq req) { @@ -67,7 +83,10 @@ public TPipeSubscribeResp handle(final TPipeSubscribeReq req) { final byte reqVersion = req.getVersion(); if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) { - return getReceiver(reqVersion).handle(req); + final SubscriptionReceiver receiver = getReceiver(reqVersion); + activeReceivers.add(receiver); + receiver.handleTimeout(); + return receiver.handle(req); } else { final TSStatus status = RpcUtils.getStatus( @@ -126,8 +145,13 @@ private SubscriptionReceiver setAndGetReceiver(final byte reqVersion) { public final void handleClientExit() { final SubscriptionReceiver receiver = receiverThreadLocal.get(); if (receiver != null) { + activeReceivers.remove(receiver); receiver.handleExit(); receiverThreadLocal.remove(); } } + + private void checkReceiverTimeouts() { + activeReceivers.forEach(SubscriptionReceiver::handleTimeout); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java index c636e1b2dc6f7..36e3c9b74f5e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java @@ -31,5 +31,7 @@ public interface SubscriptionReceiver { void handleExit(); + void handleTimeout(); + long remainingMs(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 203b93ef1e4bd..bfcbbaf850f7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -98,6 +98,7 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class); private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9; + private static final long HEARTBEAT_TIMEOUT_MULTIPLIER = 3L; private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); @@ -112,43 +113,55 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { private final ThreadLocal consumerConfigThreadLocal = new ThreadLocal<>(); private final ThreadLocal pollTimerThreadLocal = new ThreadLocal<>(); + private volatile ConsumerConfig sharedConsumerConfig; + private volatile boolean consumerInvalidated; + private volatile long lastActivityTimeMs = System.currentTimeMillis(); + private final AtomicLong inFlightRequestCount = new AtomicLong(0); private static final String SQL_DIALECT_TABLE_VALUE = "table"; @Override public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) { final short reqType = req.getType(); - if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) { - switch (PipeSubscribeRequestType.valueOf(reqType)) { - case HANDSHAKE: - return handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req)); - case HEARTBEAT: - return handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req)); - case SUBSCRIBE: - return handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req)); - case UNSUBSCRIBE: - return handlePipeSubscribeUnsubscribe( - PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req)); - case POLL: - return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req)); - case COMMIT: - return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req)); - case CLOSE: - return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req)); - default: - break; + beforeHandle(reqType); + try { + if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) { + switch (PipeSubscribeRequestType.valueOf(reqType)) { + case HANDSHAKE: + return handlePipeSubscribeHandshake( + PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req)); + case HEARTBEAT: + return handlePipeSubscribeHeartbeat( + PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req)); + case SUBSCRIBE: + return handlePipeSubscribeSubscribe( + PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req)); + case UNSUBSCRIBE: + return handlePipeSubscribeUnsubscribe( + PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req)); + case POLL: + return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req)); + case COMMIT: + return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req)); + case CLOSE: + return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req)); + default: + break; + } } - } - final TSStatus status = - RpcUtils.getStatus( - TSStatusCode.SUBSCRIPTION_TYPE_ERROR, - String.format("Unknown PipeSubscribeRequestType %s.", reqType)); - LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status); - return new TPipeSubscribeResp( - status, - PipeSubscribeResponseVersion.VERSION_1.getVersion(), - PipeSubscribeResponseType.ACK.getType()); + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_TYPE_ERROR, + String.format("Unknown PipeSubscribeRequestType %s.", reqType)); + LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status); + return new TPipeSubscribeResp( + status, + PipeSubscribeResponseVersion.VERSION_1.getVersion(), + PipeSubscribeResponseType.ACK.getType()); + } finally { + inFlightRequestCount.decrementAndGet(); + } } @Override @@ -171,6 +184,41 @@ public void handleExit() { unsubscribeCompleteTopics(consumerConfig); consumerConfigThreadLocal.remove(); } + clearSharedConsumerState(); + } + + @Override + public void handleTimeout() { + final ConsumerConfig consumerConfig; + final long inactiveMs; + final long timeoutMs; + synchronized (this) { + consumerConfig = sharedConsumerConfig; + if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) { + return; + } + timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig); + inactiveMs = System.currentTimeMillis() - lastActivityTimeMs; + if (inactiveMs <= timeoutMs) { + return; + } + clearSharedConsumerState(); + } + + LOGGER.info( + "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} ms, close it on server side.", + consumerConfig, + inactiveMs, + timeoutMs); + try { + closeConsumer(consumerConfig); + } catch (final Exception e) { + LOGGER.warn( + "Subscription: failed to close timed out consumer {} after {} ms inactivity", + consumerConfig, + inactiveMs, + e); + } } @Override @@ -241,6 +289,8 @@ private TPipeSubscribeResp handlePipeSubscribeHandshakeInternal( consumerConfig); } + activateConsumer(consumerConfig); + final int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); LOGGER.info( "Subscription: consumer {} handshake successfully, data node id: {}", @@ -659,6 +709,9 @@ private TPipeSubscribeResp handlePipeSubscribeCloseInternal(final PipeSubscribeC } closeConsumer(consumerConfig); + consumerConfigThreadLocal.remove(); + pollTimerThreadLocal.remove(); + clearSharedConsumerState(); return PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS); } @@ -866,4 +919,37 @@ private void unsubscribe(final ConsumerConfig consumerConfig, final Set throw new SubscriptionException(exceptionMessage); } } + + private void beforeHandle(final short reqType) { + synchronized (this) { + if (consumerInvalidated) { + consumerConfigThreadLocal.remove(); + pollTimerThreadLocal.remove(); + if (PipeSubscribeRequestType.HANDSHAKE.getType() == reqType) { + consumerInvalidated = false; + } + } + inFlightRequestCount.incrementAndGet(); + lastActivityTimeMs = System.currentTimeMillis(); + } + } + + private void activateConsumer(final ConsumerConfig consumerConfig) { + synchronized (this) { + sharedConsumerConfig = consumerConfig; + consumerInvalidated = false; + lastActivityTimeMs = System.currentTimeMillis(); + } + } + + private void clearSharedConsumerState() { + sharedConsumerConfig = null; + consumerInvalidated = true; + } + + private long calculateConsumerInactivityTimeoutMs(final ConsumerConfig consumerConfig) { + return Math.max( + SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(), + consumerConfig.getHeartbeatIntervalMs() * HEARTBEAT_TIMEOUT_MULTIPLIER); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java new file mode 100644 index 0000000000000..ba0070187e368 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.receiver; + +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; +import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscriptionReceiverV1Test { + + @Test + public void testHandleTimeoutKeepsRecentlyActiveConsumer() throws Exception { + final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1(); + final ConsumerConfig consumerConfig = createConsumerConfig(1_000L); + + setField(receiver, "sharedConsumerConfig", consumerConfig); + setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() - 1_000L); + + receiver.handleTimeout(); + + Assert.assertSame(consumerConfig, getField(receiver, "sharedConsumerConfig")); + Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated")); + } + + @Test + public void testHandleTimeoutSkipsConsumerWithInFlightRequests() throws Exception { + final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1(); + final ConsumerConfig consumerConfig = createConsumerConfig(1_000L); + + setField(receiver, "sharedConsumerConfig", consumerConfig); + setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() - 15_000L); + ((AtomicLong) getField(receiver, "inFlightRequestCount")).set(1L); + + receiver.handleTimeout(); + + Assert.assertSame(consumerConfig, getField(receiver, "sharedConsumerConfig")); + Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated")); + } + + @Test + public void testCalculateConsumerInactivityTimeoutUsesDefaultTimeout() throws Exception { + final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1(); + + Assert.assertEquals( + SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(), + invokeCalculateConsumerInactivityTimeoutMs(receiver, createConsumerConfig(1_000L))); + } + + @Test + public void testCalculateConsumerInactivityTimeoutUsesHeartbeatMultiple() throws Exception { + final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1(); + + Assert.assertEquals( + 15_000L, + invokeCalculateConsumerInactivityTimeoutMs(receiver, createConsumerConfig(5_000L))); + } + + private long invokeCalculateConsumerInactivityTimeoutMs( + final SubscriptionReceiverV1 receiver, final ConsumerConfig consumerConfig) throws Exception { + final Method method = + SubscriptionReceiverV1.class.getDeclaredMethod( + "calculateConsumerInactivityTimeoutMs", ConsumerConfig.class); + method.setAccessible(true); + return (long) method.invoke(receiver, consumerConfig); + } + + private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) { + final Map attributes = new HashMap<>(); + attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + UUID.randomUUID()); + attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + UUID.randomUUID()); + attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + return new ConsumerConfig(attributes); + } + + private Object getField(final Object target, final String fieldName) throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(target); + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +}