diff --git a/extension/runtime/src/main/java/org/iris_events/runtime/connection/AbstractConnectionProvider.java b/extension/runtime/src/main/java/org/iris_events/runtime/connection/AbstractConnectionProvider.java index 771450d6..e9bc7a9a 100644 --- a/extension/runtime/src/main/java/org/iris_events/runtime/connection/AbstractConnectionProvider.java +++ b/extension/runtime/src/main/java/org/iris_events/runtime/connection/AbstractConnectionProvider.java @@ -12,8 +12,10 @@ import org.slf4j.Logger; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoveryListener; -public abstract class AbstractConnectionProvider { +public abstract class AbstractConnectionProvider implements RecoveryListener { private ConnectionFactoryProvider connectionFactoryProvider; private InstanceInfoProvider instanceInfoProvider; private IrisConfig config; @@ -48,6 +50,10 @@ public synchronized Connection getConnection() { setConnecting(true); connect(); + if (this.connection instanceof RecoverableConnection) { + ((RecoverableConnection) this.connection).addRecoveryListener(this); + } + return connection; } @@ -65,12 +71,12 @@ private void connect() { } } - private void setConnecting(boolean connecting) { + protected void setConnecting(boolean connecting) { this.connecting.set(connecting); this.readinessCheck.setConnecting(connecting); } - private void setTimedOut(boolean timedOut) { + protected void setTimedOut(boolean timedOut) { this.readinessCheck.setTimedOut(timedOut); this.livenessCheck.setTimedOut(timedOut); } diff --git a/extension/runtime/src/main/java/org/iris_events/runtime/connection/ConsumerConnectionProvider.java b/extension/runtime/src/main/java/org/iris_events/runtime/connection/ConsumerConnectionProvider.java index 7a301a33..467b8df1 100644 --- a/extension/runtime/src/main/java/org/iris_events/runtime/connection/ConsumerConnectionProvider.java +++ b/extension/runtime/src/main/java/org/iris_events/runtime/connection/ConsumerConnectionProvider.java @@ -10,6 +10,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.Recoverable; + @Singleton public class ConsumerConnectionProvider extends AbstractConnectionProvider { private static final Logger log = LoggerFactory.getLogger(ConsumerConnectionProvider.class); @@ -26,4 +28,24 @@ public ConsumerConnectionProvider(ConnectionFactoryProvider connectionFactoryPro protected String getConnectionNamePrefix() { return CONSUMER_PREFIX; } + + @Override + public void handleRecovery(final Recoverable recoverable) { + log.warn("ConsumerConnectionProvider handleRecovery!"); + super.setConnecting(false); + super.setTimedOut(false); + } + + @Override + public void handleRecoveryStarted(final Recoverable recoverable) { + log.warn("ConsumerConnectionProvider handleRecoveryStarted!"); + super.setConnecting(true); + } + + @Override + public void handleTopologyRecoveryStarted(final Recoverable recoverable) { + log.warn("ConsumerConnectionProvider handleTopologyRecoveryStarted!"); + super.handleTopologyRecoveryStarted(recoverable); + super.setConnecting(true); + } } diff --git a/extension/runtime/src/main/java/org/iris_events/runtime/connection/ProducerConnectionProvider.java b/extension/runtime/src/main/java/org/iris_events/runtime/connection/ProducerConnectionProvider.java index 2da54a99..bc6fa0da 100644 --- a/extension/runtime/src/main/java/org/iris_events/runtime/connection/ProducerConnectionProvider.java +++ b/extension/runtime/src/main/java/org/iris_events/runtime/connection/ProducerConnectionProvider.java @@ -11,6 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.Recoverable; + import io.quarkus.runtime.StartupEvent; @Singleton @@ -37,4 +39,24 @@ public void onApplicationStart(@Observes StartupEvent event) { protected String getConnectionNamePrefix() { return PRODUCER_PREFIX; } + + @Override + public void handleRecovery(final Recoverable recoverable) { + log.warn("ProducerConnectionProvider handleRecovery!"); + super.setConnecting(false); + super.setTimedOut(false); + } + + @Override + public void handleRecoveryStarted(final Recoverable recoverable) { + log.warn("ProducerConnectionProvider handleRecoveryStarted!"); + super.setConnecting(true); + } + + @Override + public void handleTopologyRecoveryStarted(final Recoverable recoverable) { + log.warn("ProducerConnectionProvider handleTopologyRecoveryStarted!"); + super.handleTopologyRecoveryStarted(recoverable); + super.setConnecting(true); + } }