From 6f420b1638c5a99d3ca13785fd6cca01d6f79165 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 16 Feb 2022 00:01:26 -0500 Subject: [PATCH] KAFKA-10000: Add new preflight connector config validation logic --- .../kafka/connect/runtime/AbstractHerder.java | 120 +++++--- .../distributed/DistributedHerder.java | 143 +++++++++- .../distributed/DistributedHerderTest.java | 269 +++++++++++++++++- 3 files changed, 466 insertions(+), 66 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 2fe75a955b06c..166ac9f05c950 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; @@ -40,6 +42,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.Converter; @@ -349,9 +352,11 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { status.workerId(), status.trace()); } - protected Map validateBasicConnectorConfig(Connector connector, - ConfigDef configDef, - Map config) { + protected Map validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map config) { + return configDef.validateAll(config); + } + + protected Map validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map config) { return configDef.validateAll(config); } @@ -417,7 +422,23 @@ public Optional buildRestartPlan(RestartRequest request) { conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)) ); return Optional.of(new RestartPlan(request, stateInfo)); + } + + protected boolean connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + return connectorType == org.apache.kafka.connect.health.ConnectorType.SINK; + } + protected boolean connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + if (connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE) { + return SourceConnectorConfig.usesTopicCreation(connProps); + } else { + return SinkConnectorConfig.hasDlqTopicConfig(connProps); + } + } + + protected boolean connectorUsesProducer(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + return connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE + || SinkConnectorConfig.hasDlqTopicConfig(connProps); } ConfigInfos validateConnectorConfig(Map connectorProps, boolean doLog) { @@ -431,22 +452,20 @@ ConfigInfos validateConnectorConfig(Map connectorProps, boolean Connector connector = getConnector(connType); org.apache.kafka.connect.health.ConnectorType connectorType; ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector); + ConfigDef enrichedConfigDef; + Map validatedConnectorConfig; try { - ConfigDef baseConfigDef; if (connector instanceof SourceConnector) { - baseConfigDef = SourceConnectorConfig.configDef(); connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE; + enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false); + validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps); } else { - baseConfigDef = SinkConnectorConfig.configDef(); SinkConnectorConfig.validate(connectorProps); connectorType = org.apache.kafka.connect.health.ConnectorType.SINK; + enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false); + validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps); } - ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false); - Map validatedConnectorConfig = validateBasicConnectorConfig( - connector, - enrichedConfigDef, - connectorProps - ); + connectorProps.entrySet().stream() .filter(e -> e.getValue() == null) .map(Map.Entry::getKey) @@ -454,6 +473,7 @@ ConfigInfos validateConnectorConfig(Map connectorProps, boolean validatedConnectorConfig.computeIfAbsent(prop, ConfigValue::new) .addErrorMessage("Null value can not be supplied as the configuration value.") ); + List configValues = new ArrayList<>(validatedConnectorConfig.values()); Map configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys()); Set allGroups = new LinkedHashSet<>(enrichedConfigDef.groups()); @@ -487,40 +507,41 @@ ConfigInfos validateConnectorConfig(Map connectorProps, boolean ConfigInfos producerConfigInfos = null; ConfigInfos consumerConfigInfos = null; ConfigInfos adminConfigInfos = null; - if (connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) { - producerConfigInfos = validateClientOverrides(connName, - ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, - connectorConfig, - ProducerConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.PRODUCER, - connectorClientConfigOverridePolicy); - return mergeConfigInfos(connType, configInfos, producerConfigInfos); - } else { - consumerConfigInfos = validateClientOverrides(connName, - ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, - connectorConfig, - ProducerConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.CONSUMER, - connectorClientConfigOverridePolicy); - // check if topic for dead letter queue exists - String topic = connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG); - if (topic != null && !topic.isEmpty()) { - adminConfigInfos = validateClientOverrides(connName, - ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, - connectorConfig, - ProducerConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.ADMIN, - connectorClientConfigOverridePolicy); - } + if (connectorUsesProducer(connectorType, connectorProps)) { + producerConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, + connectorConfig, + ProducerConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.PRODUCER, + connectorClientConfigOverridePolicy); } - return mergeConfigInfos(connType, configInfos, consumerConfigInfos, adminConfigInfos); + if (connectorUsesAdmin(connectorType, connectorProps)) { + adminConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, + connectorConfig, + AdminClientConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.ADMIN, + connectorClientConfigOverridePolicy); + } + if (connectorUsesConsumer(connectorType, connectorProps)) { + consumerConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, + connectorConfig, + ConsumerConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.CONSUMER, + connectorClientConfigOverridePolicy); + } + return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos); } finally { Plugins.compareAndSwapLoaders(savedLoader); } @@ -665,7 +686,7 @@ protected Connector getConnector(String connType) { return tempConnectors.computeIfAbsent(connType, k -> plugins().newConnector(k)); } - /* + /** * Retrieves ConnectorType for the corresponding connector class * @param connClass class of the connector */ @@ -673,6 +694,15 @@ public ConnectorType connectorTypeForClass(String connClass) { return ConnectorType.from(getConnector(connClass).getClass()); } + /** + * Retrieves ConnectorType for the class specified in the connector config + * @param connConfig the connector config; may not be null + * @return the {@link ConnectorType} of the connector + */ + public ConnectorType connectorTypeForConfig(Map connConfig) { + return connectorTypeForClass(connConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + } + /** * Checks a given {@link ConfigInfos} for validation error messages and adds an exception * to the given {@link Callback} if any were found. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 65a8e7e15b818..49d3a5278b8ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.ConnectException; @@ -58,6 +57,10 @@ import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; +import org.apache.kafka.connect.source.ExactlyOnceSupport; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; @@ -138,6 +141,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; + private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250; private static final int START_STOP_THREAD_POOL_SIZE = 8; private static final short BACKOFF_RETRIES = 5; @@ -842,21 +846,134 @@ public void deleteConnectorConfig(final String connName, final Callback validateBasicConnectorConfig(Connector connector, - ConfigDef configDef, - Map config) { - Map validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config); - if (connector instanceof SinkConnector) { - ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG); - String name = (String) validatedName.value(); - if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) { - validatedName.addErrorMessage("Consumer group for sink connector named " + name + - " conflicts with Connect worker group " + workerGroupId); + protected Map validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map config) { + Map result = super.validateSinkConnectorConfig(connector, configDef, config); + validateSinkConnectorGroupId(result); + return result; + } + + @Override + protected Map validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map config) { + Map result = super.validateSourceConnectorConfig(connector, configDef, config); + validateSourceConnectorExactlyOnceSupport(config, result, connector); + validateSourceConnectorTransactionBoundary(config, result, connector); + return result; + } + + + private void validateSinkConnectorGroupId(Map validatedConfig) { + ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG); + String name = (String) validatedName.value(); + if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) { + validatedName.addErrorMessage("Consumer group for sink connector named " + name + + " conflicts with Connect worker group " + workerGroupId); + } + } + + private void validateSourceConnectorExactlyOnceSupport( + Map rawConfig, + Map validatedConfig, + SourceConnector connector) { + ConfigValue validatedExactlyOnceSupport = validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG); + if (validatedExactlyOnceSupport.errorMessages().isEmpty()) { + // Should be safe to parse the enum from the user-provided value since it's passed validation so far + SourceConnectorConfig.ExactlyOnceSupportLevel exactlyOnceSupportLevel = + SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value())); + if (SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel)) { + if (!config.exactlyOnceSourceEnabled()) { + validatedExactlyOnceSupport.addErrorMessage("This worker does not have exactly-once source support enabled."); + } + + try { + ExactlyOnceSupport exactlyOnceSupport = connector.exactlyOnceSupport(rawConfig); + if (!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) { + final String validationErrorMessage; + // Would do a switch here but that doesn't permit matching on null values + if (exactlyOnceSupport == null) { + validationErrorMessage = "The connector does not implement the API required for preflight validation of exactly-once " + + "source support. Please consult the documentation for the connector to determine whether it supports exactly-once " + + "guarantees, and then consider reconfiguring the connector to use the value \"" + + SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED + + "\" for this property (which will disable this preflight check and allow the connector to be created)."; + } else if (ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) { + validationErrorMessage = "The connector does not support exactly-once delivery guarantees with the provided configuration."; + } else { + throw new ConnectException("Unexpected value returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport); + } + validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage); + } + } catch (Exception e) { + log.error("Failed while validating connector support for exactly-once guarantees", e); + String validationErrorMessage = "An unexpected error occurred during validation"; + String failureMessage = e.getMessage(); + if (failureMessage != null && !failureMessage.trim().isEmpty()) { + validationErrorMessage += ": " + failureMessage.trim(); + } else { + validationErrorMessage += "; please see the worker logs for more details."; + } + validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage); + } } } - return validatedConfig; } + private void validateSourceConnectorTransactionBoundary( + Map rawConfig, + Map validatedConfig, + SourceConnector connector) { + ConfigValue validatedTransactionBoundary = validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG); + if (validatedTransactionBoundary.errorMessages().isEmpty()) { + // Should be safe to parse the enum from the user-provided value since it's passed validation so far + SourceTask.TransactionBoundary transactionBoundary = + SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value())); + if (SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) { + try { + ConnectorTransactionBoundaries connectorTransactionSupport = connector.canDefineTransactionBoundaries(rawConfig); + if (connectorTransactionSupport == null) { + validatedTransactionBoundary.addErrorMessage( + "This connector has returned a null value from its canDefineTransactionBoundaries method, which is not permitted. " + + "The connector will be treated as if it cannot define its own transaction boundaries, and cannot be configured with " + + "'" + SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" + SourceTask.TransactionBoundary.CONNECTOR + "'." + ); + } else if (!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport)) { + validatedTransactionBoundary.addErrorMessage( + "The connector does not support connector-defined transaction boundaries with the given configuration. " + + "Please reconfigure it to use a different transaction boundary definition."); + } + } catch (Exception e) { + log.error("Failed while validating connector support for defining its own transaction boundaries", e); + String validationErrorMessage = "An unexpected error occurred during validation"; + String failureMessage = e.getMessage(); + if (failureMessage != null && !failureMessage.trim().isEmpty()) { + validationErrorMessage += ": " + failureMessage.trim(); + } else { + validationErrorMessage += "; please see the worker logs for more details."; + } + validatedTransactionBoundary.addErrorMessage(validationErrorMessage); + } + } + } + } + + @Override + protected boolean connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + return super.connectorUsesAdmin(connectorType, connProps) + || connectorUsesSeparateOffsetsTopicClients(connectorType, connProps); + } + + @Override + protected boolean connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + return super.connectorUsesConsumer(connectorType, connProps) + || connectorUsesSeparateOffsetsTopicClients(connectorType, connProps); + } + + private boolean connectorUsesSeparateOffsetsTopicClients(org.apache.kafka.connect.health.ConnectorType connectorType, Map connProps) { + if (connectorType != org.apache.kafka.connect.health.ConnectorType.SOURCE) { + return false; + } + return config.exactlyOnceSourceEnabled() + || !connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "").trim().isEmpty(); + } @Override public void putConnectorConfig(final String connName, final Map config, final boolean allowReplace, @@ -897,7 +1014,7 @@ public void putConnectorConfig(final String connName, final Map // snapshot yet. The existing task info should still be accurate. ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), // validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG - connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); + connectorTypeForConfig(config)); callback.onCompletion(null, new Created<>(!exists, info)); return null; }, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 996c8407c6ec2..54dacd6d82362 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; @@ -33,6 +32,7 @@ import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.SessionKey; import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TopicStatus; @@ -52,6 +52,8 @@ import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.ConfigBackingStore; @@ -90,16 +92,18 @@ import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -790,22 +794,261 @@ public void testCreateConnectorFailedValidation() throws Exception { PowerMock.verifyAll(); } - @SuppressWarnings("unchecked") @Test public void testConnectorNameConflictsWithWorkerGroupId() { Map config = new HashMap<>(CONN2_CONFIG); config.put(ConnectorConfig.NAME_CONFIG, "test-group"); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + SinkConnector connectorMock = PowerMock.createMock(SinkConnector.class); + + PowerMock.replayAll(connectorMock); // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with // the consumer group id we would use for this sink - Map validatedConfigs = - herder.validateBasicConnectorConfig(connectorMock, ConnectorConfig.configDef(), config); + Map validatedConfigs = herder.validateSinkConnectorConfig( + connectorMock, SinkConnectorConfig.configDef(), config); ConfigValue nameConfig = validatedConfigs.get(ConnectorConfig.NAME_CONFIG); - assertNotNull(nameConfig.errorMessages()); - assertFalse(nameConfig.errorMessages().isEmpty()); + assertEquals( + Collections.singletonList("Consumer group for sink connector named test-group conflicts with Connect worker group connect-test-group"), + nameConfig.errorMessages()); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidation() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config))) + .andReturn(ExactlyOnceSupport.SUPPORTED); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals(Collections.emptyList(), errors); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config))) + .andReturn(ExactlyOnceSupport.UNSUPPORTED); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals( + Collections.singletonList("The connector does not support exactly-once delivery guarantees with the provided configuration."), + errors); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidationOnUnknownConnector() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config))) + .andReturn(null); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("The connector does not implement the API required for preflight validation of exactly-once source support.")); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + String errorMessage = "time to add a new unit test :)"; + EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config))) + .andThrow(new NullPointerException(errorMessage)); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains(errorMessage)); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() { + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config))) + .andReturn(ExactlyOnceSupport.SUPPORTED); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals( + Collections.singletonList("This worker does not have exactly-once source support enabled."), + errors); + + PowerMock.verifyAll(); + } + + @Test + public void testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, "invalid"); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("String must be one of (case insensitive): ")); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorTransactionBoundaryValidation() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config))) + .andReturn(ConnectorTransactionBoundaries.SUPPORTED); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertEquals(Collections.emptyList(), errors); + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorTransactionBoundaryValidationOnUnsupportedConnector() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config))) + .andReturn(ConnectorTransactionBoundaries.UNSUPPORTED); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("The connector does not support connector-defined transaction boundaries with the given configuration.")); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + String errorMessage = "Wait I thought we tested for this?"; + EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config))) + .andThrow(new ConnectException(errorMessage)); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains(errorMessage)); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() { + herder = exactlyOnceHerder(); + Map config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, "CONNECTOR.toString()"); + + SourceConnector connectorMock = PowerMock.createMock(SourceConnector.class); + + PowerMock.replayAll(connectorMock); + + Map validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("String must be one of (case insensitive): ")); + assertEquals(1, errors.size()); + + PowerMock.verifyAll(); } @Test @@ -2851,4 +3094,14 @@ private abstract class BogusSourceConnector extends SourceConnector { private abstract class BogusSourceTask extends SourceTask { } + private DistributedHerder exactlyOnceHerder() { + Map config = new HashMap<>(HERDER_CONFIG); + config.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + return PowerMock.createPartialMock(DistributedHerder.class, + new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"}, + new DistributedConfig(config), worker, WORKER_ID, KAFKA_CLUSTER_ID, + statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy, + new AutoCloseable[0]); + } + }