Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,12 @@ public void startClusters(Map<String, String> additionalMM2Config) throws Except
.build();

primary.start();
primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did not start in time.");

waitForTopicCreated(primary, "mm2-status.backup.internal");
waitForTopicCreated(primary, "mm2-offsets.backup.internal");
waitForTopicCreated(primary, "mm2-configs.backup.internal");

backup.start();
backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time.");

primaryProducer = initializeProducer(primary);
backupProducer = initializeProducer(backup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ public void setup() throws Exception {

// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time"
);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ public void testAddAndRemoveWorker() throws Exception {
// set up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);

Expand Down Expand Up @@ -218,9 +215,6 @@ public void testRestartFailedTask() throws Exception {
props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");

Comment thread
gharris1727 marked this conversation as resolved.
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// Try to start the connector and its single task.
connect.configureConnector(CONNECTOR_NAME, props);

Expand Down Expand Up @@ -258,9 +252,6 @@ public void testBrokerCoordinator() throws Exception {
// set up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);

Expand Down Expand Up @@ -312,9 +303,6 @@ public void testTaskStatuses() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// base connector props
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
Expand Down Expand Up @@ -352,8 +340,6 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
.build();
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");

// and when the connector is not configured to create topics
Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
Expand Down Expand Up @@ -405,9 +391,6 @@ public void testPauseStopResume() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// Want to make sure to use multiple tasks
final int numTasks = 4;
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
Expand Down Expand Up @@ -497,9 +480,6 @@ public void testStoppedState() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
// Fail the connector on startup
props.put("connector.start.inject.error", "true");
Expand Down Expand Up @@ -572,11 +552,6 @@ public void testTasksConfigDeprecation() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);

connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
Expand All @@ -600,9 +575,6 @@ public void testCreateConnectorWithPausedInitialState() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
Expand Down Expand Up @@ -634,9 +606,6 @@ public void testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() t
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);

// Configure the connector to produce a maximum of 10 messages
Expand Down Expand Up @@ -687,9 +656,6 @@ public void testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() thr
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// Create topic and produce 10 messages
connect.kafka().createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -754,9 +720,6 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// Create a connector with PAUSED initial state
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
Expand Down Expand Up @@ -806,9 +769,6 @@ public void testPatchConnectorConfig() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

connect.kafka().createTopic(TOPIC_NAME);

Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
Expand Down Expand Up @@ -859,8 +819,6 @@ public void testRequestTimeouts() throws Exception {
.numWorkers(1)
.build();
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(1,
"Worker did not start in time");

Map<String, String> connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME);
Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
Expand Down Expand Up @@ -927,8 +885,6 @@ public void testPollTimeoutExpiry() throws Exception {

connect.start();

connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time");

Map<String, String> connectorWithBlockingTaskStopConfig = new HashMap<>();
connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName());
connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
Expand Down Expand Up @@ -1008,11 +964,6 @@ public void testTasksMaxEnforcement() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);

Map<String, String> connectorProps = defaultSourceConnectorProps(TOPIC_NAME);
int maxTasks = 1;
connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
Expand Down Expand Up @@ -1177,11 +1128,6 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);

final String connectorTopic = "connector-topic";
connect.kafka().createTopic(connectorTopic, 1);

Expand Down Expand Up @@ -1214,7 +1160,7 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception {

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
"Workers did not start in time after cluster was rolled."
);

final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0);
Expand Down Expand Up @@ -1270,11 +1216,6 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);

final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);

Expand Down Expand Up @@ -1344,11 +1285,6 @@ public void testRuntimePropertyReconfiguration() throws Exception {
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);

final String topic = "kafka9228";
connect.kafka().createTopic(topic, 1);
connect.kafka().produce(topic, "non-json-value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws In

// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

return connect;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception
connect.start();
return connect;
});
connect.assertions().assertExactlyNumWorkersAreUp(numWorkers,
"Initial group of workers did not start in time.");
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ public void testGetActiveTopics() throws InterruptedException {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");

connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for connector: " + FOO_CONNECTOR);

Expand Down Expand Up @@ -179,8 +177,6 @@ public void testTopicTrackingResetIsDisabled() throws InterruptedException {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");

connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for connector: " + FOO_CONNECTOR);

Expand Down Expand Up @@ -235,8 +231,6 @@ public void testTopicTrackingIsDisabled() throws InterruptedException {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);

connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");

// start a source connector
connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public void setup() throws InterruptedException {

// start Connect cluster
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");

// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,6 @@ public void testFencedLeaderRecovery() throws Exception {
connectorHandle.expectedRecords(MINIMUM_MESSAGES);
connectorHandle.expectedCommits(MINIMUM_MESSAGES);

// make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around)
connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time");

// fence out the leader of the cluster
Producer<?, ?> zombieLeader = transactionalProducer(
"simulated-zombie-leader",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedExce

// Start the Connect cluster
connect.start();
connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time.");
connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time.");
log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers);

// Check the topics
Expand Down Expand Up @@ -111,9 +109,6 @@ public void testCreateInternalTopicsWithFewerReplicasThanBrokers() throws Interr

// Start the Connect cluster
connect.start();
connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
connect.assertions().assertAtLeastNumWorkersAreUp(numWorkers, "Worker did not start in time.");
log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers);

// Check the topics
log.info("Verifying the internal topics for Connect");
Expand All @@ -126,7 +121,7 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "3");
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "2");
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
int numWorkers = 1;
int numWorkers = 0;
int numBrokers = 1;
connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
.workerProps(workerProps)
Expand All @@ -137,11 +132,14 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I

// Start the brokers and Connect, but Connect should fail to create config and offset topic
connect.start();
connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);

// Try to start a worker
connect.addWorker();

// Verify that the offset and config topic don't exist;
// the status topic may have been created if timing was right but we don't care
// TODO: Synchronously await and verify that the worker fails during startup
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
}
Expand Down Expand Up @@ -169,7 +167,6 @@ public void testFailToStartWhenInternalTopicsAreNotCompacted() throws Interrupte
// Start the brokers but not Connect
log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers);
connect.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, this sets numWorkers=0, and therefore can call the blocking start() method safely.

WDYT about changing testFailToCreateInternalTopicsWithMoreReplicasThanBrokers to use the same pattern? That would eliminate the need for the non-blocking start method, and simplify the control flow.

The minority of call-sites are going to expect the workers to fail to start up, so I think it's okay to use a workaround instead of giving them a first-class method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to rework testFailToCreateInternalTopicsWithMoreReplicasThanBrokers, but just in case someone downstream is using start in a way that relies on existing behavior, I'd also like to keep in the non-blocking method, even if we don't use it in any of our tests. I know we're not under any obligation to but it's such a small change that the negligible maintenance burden seems worth the tradeoff. Does that work for you?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For downstream users intentionally giving the embedded connect bad configs, they will need to alter the test anyway, so I'd prefer that they follow the pattern used by upstream.

I don't think that anyone should really be relying on this behavior, otherwise I would make an argument for start() remaining non-blocking (which i definitely don't want).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. I've removed the overloaded variant and tweaked the only remaining integration test that uses it. That test itself is actually subtly broken since it doesn't (and never did) synchronously wait for worker startup before checking that internal topics don't exist, but since this PR doesn't introduce a regression I've thrown a TODO into the codebase and left everything else for a follow-up.

connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);

// Create the good topics
Expand Down Expand Up @@ -243,7 +240,6 @@ public void testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefau
// Start the brokers but not Connect
log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers);
connect.start();
connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);

// Create the valid internal topics w/o topic settings, so these will use the broker's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,6 @@ private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<St

result.start();

try {
result.assertions().assertExactlyNumWorkersAreUp(
NUM_WORKERS,
"Workers did not complete startup in time"
);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while awaiting cluster startup", e);
}

return result;
});
}
Expand Down
Loading