diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index 34e3fc21abb40..ce388bd96436c 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -18,10 +18,11 @@ */ package org.apache.pulsar.io.elasticsearch; -import org.testcontainers.elasticsearch.ElasticsearchContainer; - import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +@Slf4j public class ElasticSearchTestBase { private static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) @@ -29,7 +30,10 @@ public class ElasticSearchTestBase { protected static ElasticsearchContainer createElasticsearchContainer() { return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m"); + .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") + .withEnv("ingest.geoip.downloader.enabled", "false") + .withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java index 75f80bf1c54b2..a78ae856a508c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java @@ -19,12 +19,10 @@ package org.apache.pulsar.tests.integration.io.sinks; import static org.testng.Assert.assertTrue; - import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; - import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.Data; @@ -93,7 +91,18 @@ public ElasticSearchSinkTester(boolean schemaEnable) { @Override protected ElasticSearchContainer createSinkService(PulsarCluster cluster) { - return new ElasticSearchContainer(cluster.getClusterName()); + ElasticSearchContainer elasticsearchContainer = new ElasticSearchContainer(cluster.getClusterName()); + configureElasticContainer(elasticsearchContainer); + return elasticsearchContainer; + } + + protected void configureElasticContainer(ElasticSearchContainer elasticContainer) { + elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false"); + + // allow disk to fill up beyond default 90% threshold + elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false"); + + elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 92b10cf0c30a8..36ec308682300 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -158,7 +158,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s .withEnv("journalSyncData", "false") .withEnv("journalMaxGroupWaitMSec", "0") .withEnv("clusterName", clusterName) + .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95") .withEnv("diskUsageThreshold", "0.99") + .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97") .withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize) ) );