Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;

Expand All @@ -48,6 +50,16 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
public static final String PULSAR_2_1_IMAGE_NAME = "apachepulsar/pulsar:2.1.0";
public static final String PULSAR_2_0_IMAGE_NAME = "apachepulsar/pulsar:2.0.0";

/**
* For debugging purposes, it is useful to have the ability to leave containers running.
* This mode can be activated by setting environment variables
* PULSAR_CONTAINERS_LEAVE_RUNNING=true and TESTCONTAINERS_REUSE_ENABLE=true
* After debugging, one can use this command to kill all containers that were left running:
* docker kill $(docker ps -q --filter "label=pulsarcontainer=true")
*/
public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));

private final String hostname;
private final String serviceName;
private final String serviceEntryPoint;
Expand All @@ -71,13 +83,8 @@ public PulsarContainer(String clusterName,
int servicePort,
int httpPort,
String httpPath) {
super(clusterName, DEFAULT_IMAGE_NAME);
this.hostname = hostname;
this.serviceName = serviceName;
this.serviceEntryPoint = serviceEntryPoint;
this.servicePort = servicePort;
this.httpPort = httpPort;
this.httpPath = httpPath;
this(clusterName, hostname, serviceName, serviceEntryPoint, servicePort, httpPort, httpPath,
DEFAULT_IMAGE_NAME);
}

public PulsarContainer(String clusterName,
Expand All @@ -95,6 +102,20 @@ public PulsarContainer(String clusterName,
this.servicePort = servicePort;
this.httpPort = httpPort;
this.httpPath = httpPath;

configureLeaveContainerRunning(this);
}

public static void configureLeaveContainerRunning(
GenericContainer<?> container) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
// use Testcontainers reuse containers feature to leave the container running
container.withReuse(true);
// add label that can be used to find containers that are left running.
container.withLabel("pulsarcontainer", "true");
// add a random label to prevent reuse of containers
container.withLabel("pulsarcontainer.random", UUID.randomUUID().toString());
}
}

@Override
Expand All @@ -109,6 +130,15 @@ protected void beforeStop() {
}
}

@Override
public void stop() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
return;
}
super.stop();
}

@Override
public String getContainerName() {
return clusterName + "-" + hostname;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_PORT;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -269,6 +270,7 @@ public void start() throws Exception {
GenericContainer<?> serviceContainer = service.getValue();
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(service.getKey());
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
serviceContainer.start();
log.info("Successfully start external service {}.", service.getKey());
});
Expand All @@ -280,12 +282,17 @@ public void startService(String networkAlias,
log.info("Starting external service {} ...", networkAlias);
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(networkAlias);
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
serviceContainer.start();
log.info("Successfully start external service {}", networkAlias);
}

public void stopService(String networkAlias,
GenericContainer<?> serviceContainer) {
public static void stopService(String networkAlias,
GenericContainer<?> serviceContainer) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
log.info("Stopping external service {} ...", networkAlias);
serviceContainer.stop();
log.info("Successfully stop external service {}", networkAlias);
Expand All @@ -309,6 +316,10 @@ public PrestoWorkerContainer getPrestoWorkerContainer() {
}

public synchronized void stop() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}

List<GenericContainer> containers = new ArrayList<>();

Expand Down Expand Up @@ -361,6 +372,10 @@ public void startPrestoWorker(String offloadDriver, String offloadProperties) {
}

public void stopPrestoWorker() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
if (sqlFollowWorkerContainers != null && sqlFollowWorkerContainers.size() > 0) {
for (PrestoWorkerContainer followWorker : sqlFollowWorkerContainers.values()) {
followWorker.stop();
Expand Down Expand Up @@ -495,13 +510,18 @@ public synchronized void startWorkers() {
}

public synchronized void stopWorkers() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
// Stop workers that have been initialized
workerContainers.values().parallelStream().forEach(WorkerContainer::stop);
workerContainers.clear();
}

public void startContainers(Map<String, GenericContainer<?>> containers) {
containers.forEach((name, container) -> {
PulsarContainer.configureLeaveContainerRunning(container);
container
.withNetwork(network)
.withNetworkAliases(name)
Expand All @@ -510,11 +530,19 @@ public void startContainers(Map<String, GenericContainer<?>> containers) {
});
}

public void stopContainers(Map<String, GenericContainer<?>> containers) {
public static void stopContainers(Map<String, GenericContainer<?>> containers) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
containers.values().parallelStream().forEach(GenericContainer::stop);
log.info("Successfully stop containers : {}", containers);
}

private static void logIgnoringStopDueToLeaveRunning() {
log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
}

public BrokerContainer getAnyBroker() {
return getAnyContainer(brokerContainers, "pulsar-broker");
}
Expand Down