Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.slf4j.Logger;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.RecoverableConnection;
import com.rabbitmq.client.RecoveryListener;

public abstract class AbstractConnectionProvider {
public abstract class AbstractConnectionProvider implements RecoveryListener {
private ConnectionFactoryProvider connectionFactoryProvider;
private InstanceInfoProvider instanceInfoProvider;
private IrisConfig config;
Expand Down Expand Up @@ -48,6 +50,10 @@ public synchronized Connection getConnection() {
setConnecting(true);
connect();

if (this.connection instanceof RecoverableConnection) {
((RecoverableConnection) this.connection).addRecoveryListener(this);
}

return connection;
}

Expand All @@ -65,12 +71,12 @@ private void connect() {
}
}

private void setConnecting(boolean connecting) {
protected void setConnecting(boolean connecting) {
this.connecting.set(connecting);
this.readinessCheck.setConnecting(connecting);
}

private void setTimedOut(boolean timedOut) {
protected void setTimedOut(boolean timedOut) {
this.readinessCheck.setTimedOut(timedOut);
this.livenessCheck.setTimedOut(timedOut);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Recoverable;

@Singleton
public class ConsumerConnectionProvider extends AbstractConnectionProvider {
private static final Logger log = LoggerFactory.getLogger(ConsumerConnectionProvider.class);
Expand All @@ -26,4 +28,24 @@ public ConsumerConnectionProvider(ConnectionFactoryProvider connectionFactoryPro
protected String getConnectionNamePrefix() {
return CONSUMER_PREFIX;
}

@Override
public void handleRecovery(final Recoverable recoverable) {
log.warn("ConsumerConnectionProvider handleRecovery!");
super.setConnecting(false);
super.setTimedOut(false);
}

@Override
public void handleRecoveryStarted(final Recoverable recoverable) {
log.warn("ConsumerConnectionProvider handleRecoveryStarted!");
super.setConnecting(true);
}

@Override
public void handleTopologyRecoveryStarted(final Recoverable recoverable) {
log.warn("ConsumerConnectionProvider handleTopologyRecoveryStarted!");
super.handleTopologyRecoveryStarted(recoverable);
super.setConnecting(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Recoverable;

import io.quarkus.runtime.StartupEvent;

@Singleton
Expand All @@ -37,4 +39,24 @@ public void onApplicationStart(@Observes StartupEvent event) {
protected String getConnectionNamePrefix() {
return PRODUCER_PREFIX;
}

@Override
public void handleRecovery(final Recoverable recoverable) {
log.warn("ProducerConnectionProvider handleRecovery!");
super.setConnecting(false);
super.setTimedOut(false);
}

@Override
public void handleRecoveryStarted(final Recoverable recoverable) {
log.warn("ProducerConnectionProvider handleRecoveryStarted!");
super.setConnecting(true);
}

@Override
public void handleTopologyRecoveryStarted(final Recoverable recoverable) {
log.warn("ProducerConnectionProvider handleTopologyRecoveryStarted!");
super.handleTopologyRecoveryStarted(recoverable);
super.setConnecting(true);
}
}
Loading