diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f6ca0d02fe316..58db84d7be433 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -134,7 +134,7 @@ + files="(ConfigKeyInfo|Worker).java"/> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d89f577688f35..271f20ada6336 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -298,7 +298,7 @@ public void removeMetrics() { } @Override - protected void close() { + protected void doClose() { if (started) { Utils.closeQuietly(task::stop, "source task"); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 485dda9b98b1b..a97088d4edd1c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; +import org.apache.kafka.connect.util.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,13 +274,14 @@ public boolean includeRecordDetailsInErrorLog() { public > List> transformations() { final List transformAliases = getList(TRANSFORMS_CONFIG); - final List> transformations = new ArrayList<>(transformAliases.size()); - for (String alias : transformAliases) { - final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; + try (Closeables closeables = new Closeables()) { + final List> transformations = new ArrayList<>(transformAliases.size()); + for (String alias : transformAliases) { + final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; - try { @SuppressWarnings("unchecked") final Transformation transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); + closeables.register(transformation, "transformation " + alias + " for connector " + getString(NAME_CONFIG)); Map configs = originalsWithPrefix(prefix); Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG); Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG); @@ -288,17 +290,16 @@ public > List> transformations() { String predicatePrefix = PREDICATES_PREFIX + predicateAlias + "."; @SuppressWarnings("unchecked") Predicate predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); + closeables.register(predicate, "predicate " + predicateAlias + " for connector " + getString(NAME_CONFIG)); predicate.configure(originalsWithPrefix(predicatePrefix)); transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); } else { transformations.add(transformation); } - } catch (Exception e) { - throw new ConnectException(e); } + closeables.clear(); + return transformations; } - - return transformations; } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 16e48d8f17edf..10ff55bc59c02 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -66,6 +66,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.Closeables; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; @@ -171,7 +172,6 @@ public Worker( this.globalOffsetBackingStore.configure(config); this.workerConfigTransformer = initConfigTransformer(); - } private WorkerConfigTransformer initConfigTransformer() { @@ -278,8 +278,8 @@ public void startConnector( TargetState initialState, Callback onConnectorStateChange ) { - final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); - try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { + try (LoggingContext loggingContext = LoggingContext.forConnector(connName); Closeables closeables = new Closeables()) { + final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); if (connectors.containsKey(connName)) { onConnectorStateChange.onCompletion( new ConnectException("Connector with name " + connName + " already exists"), @@ -287,18 +287,16 @@ public void startConnector( return; } + final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connClass); + // If something fails, make sure we switch back to the connector's loader before we start + // deallocating resources that were instantiated with that loader + closeables.useLoader(connectorLoader); final WorkerConnector workerConnector; - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - // By the time we arrive here, CONNECTOR_CLASS_CONFIG has been validated already - // Getting this value from the unparsed map will allow us to instantiate the - // right config (source or sink) - final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connClass); - savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); - + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); + closeables.register(connector::stop, "connector " + connName); final ConnectorConfig connConfig; final CloseableOffsetStorageReader offsetReader; final ConnectorOffsetBackingStore offsetStore; @@ -314,19 +312,17 @@ public void startConnector( offsetStore = config.exactlyOnceSourceEnabled() ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector) : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector); + closeables.register(offsetStore, "offset store for " + connName); offsetStore.configure(config); offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter); + closeables.register(offsetReader, "offset reader for " + connName); } workerConnector = new WorkerConnector( connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader); log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); workerConnector.transitionTo(initialState, onConnectorStateChange); - Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { log.error("Failed to start connector {}", connName, t); - // Can't be put in a finally block because it needs to be swapped before the call on - // statusListener - Plugins.compareAndSwapLoaders(savedLoader); connectorStatusListener.onFailure(connName, t); onConnectorStateChange.onCompletion(t, null); return; @@ -337,14 +333,13 @@ public void startConnector( onConnectorStateChange.onCompletion( new ConnectException("Connector with name " + connName + " already exists"), null); - // Don't need to do any cleanup of the WorkerConnector instance (such as calling - // shutdown() on it) here because it hasn't actually started running yet return; } executor.submit(workerConnector); log.info("Finished creating connector {}", connName); + closeables.clear(); } } @@ -360,12 +355,8 @@ public boolean isSinkConnector(String connName) { if (workerConnector == null) throw new ConnectException("Connector " + connName + " not found in this worker."); - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { return workerConnector.isSinkConnector(); - } finally { - Plugins.compareAndSwapLoaders(savedLoader); } } @@ -387,10 +378,8 @@ public List> connectorTaskConfigs(String connName, Connector int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); Map connOriginals = connConfig.originalsStrings(); - Connector connector = workerConnector.connector(); - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { + Connector connector = workerConnector.connector(); String taskClassName = connector.taskClass().getName(); for (Map taskProps : connector.taskConfigs(maxTasks)) { // Ensure we don't modify the connector's copy of the config @@ -404,8 +393,6 @@ public List> connectorTaskConfigs(String connName, Connector } result.add(taskConfig); } - } finally { - Plugins.compareAndSwapLoaders(savedLoader); } } @@ -427,12 +414,8 @@ private void stopConnector(String connName) { return; } - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { workerConnector.shutdown(); - } finally { - Plugins.compareAndSwapLoaders(savedLoader); } } } @@ -541,8 +524,13 @@ public boolean startSinkTask( TaskStatus.Listener statusListener, TargetState initialState ) { - return startTask(id, connProps, taskProps, statusListener, - new SinkTaskBuilder(id, configState, statusListener, initialState)); + try (Closeables closeables = new Closeables()) { + boolean started = startTask(id, connProps, taskProps, statusListener, closeables, + new SinkTaskBuilder(id, configState, statusListener, initialState)); + if (started) + closeables.clear(); + return started; + } } /** @@ -564,8 +552,13 @@ public boolean startSourceTask( TaskStatus.Listener statusListener, TargetState initialState ) { - return startTask(id, connProps, taskProps, statusListener, - new SourceTaskBuilder(id, configState, statusListener, initialState)); + try (Closeables closeables = new Closeables()) { + boolean started = startTask(id, connProps, taskProps, statusListener, closeables, + new SourceTaskBuilder(id, configState, statusListener, initialState)); + if (started) + closeables.clear(); + return started; + } } /** @@ -592,8 +585,13 @@ public boolean startExactlyOnceSourceTask( Runnable preProducerCheck, Runnable postProducerCheck ) { - return startTask(id, connProps, taskProps, statusListener, - new ExactlyOnceSourceTaskBuilder(id, configState, statusListener, initialState, preProducerCheck, postProducerCheck)); + try (Closeables closeables = new Closeables()) { + boolean started = startTask(id, connProps, taskProps, statusListener, closeables, + new ExactlyOnceSourceTaskBuilder(id, configState, statusListener, initialState, preProducerCheck, postProducerCheck)); + if (started) + closeables.clear(); + return started; + } } /** @@ -606,42 +604,44 @@ public boolean startExactlyOnceSourceTask( * @param taskBuilder the {@link TaskBuilder} used to create the {@link WorkerTask} that manages the lifecycle of the task. * @return true if the task started successfully. */ - private boolean startTask( + // Visible for testing + boolean startTask( ConnectorTaskId id, Map connProps, Map taskProps, TaskStatus.Listener statusListener, + Closeables closeables, TaskBuilder taskBuilder ) { - final WorkerTask workerTask; - final TaskStatus.Listener taskStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); try (LoggingContext loggingContext = LoggingContext.forTask(id)) { + final WorkerTask workerTask; + final TaskStatus.Listener taskStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); log.info("Creating task {}", id); if (tasks.containsKey(id)) throw new ConnectException("Task already exists in this worker: " + id); connectorStatusMetricsGroup.recordTaskAdded(id); - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); - savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); + String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); + // If something fails, make sure we switch back to the connector's loader before we start + // deallocating resources that were instantiated with that loader + closeables.useLoader(connectorLoader); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); final TaskConfig taskConfig = new TaskConfig(taskProps); final Class taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); final Task task = plugins.newTask(taskClass); + closeables.register(task::stop, "task " + id); log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); - // By maintaining connector's specific class loader for this thread here, we first - // search for converters within the connector dependencies. + // By maintaining the connector's specific class loader here, we first + // search for converters within the connector's dependencies. // If any of these aren't found, that means the connector didn't configure specific converters, // so we should instantiate based upon the worker configuration Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage .CURRENT_CLASSLOADER); Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); - HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.CURRENT_CLASSLOADER); if (keyConverter == null) { keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); @@ -654,6 +654,8 @@ private boolean startTask( } else { log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); } + HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.CURRENT_CLASSLOADER); if (headerConverter == null) { headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage .PLUGINS); @@ -661,6 +663,7 @@ private boolean startTask( } else { log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } + closeables.register(headerConverter, "header converter for task " + id); workerTask = taskBuilder .withTask(task) @@ -669,15 +672,11 @@ private boolean startTask( .withValueConverter(valueConverter) .withHeaderConverter(headerConverter) .withClassloader(connectorLoader) - .build(); + .build(closeables); workerTask.initialize(taskConfig); - Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { log.error("Failed to start task {}", id, t); - // Can't be put in a finally block because it needs to be swapped before the call on - // statusListener - Plugins.compareAndSwapLoaders(savedLoader); connectorStatusMetricsGroup.recordTaskRemoved(id); taskStatusListener.onFailure(id, t); return false; @@ -713,7 +712,7 @@ KafkaFuture fenceZombies(String connName, int numTasks, Map connClass = plugins.connectorClass( connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); @@ -728,23 +727,21 @@ KafkaFuture fenceZombies(String connName, int numTasks, Map transactionalIds = IntStream.range(0, numTasks) - .mapToObj(i -> new ConnectorTaskId(connName, i)) - .map(this::taskTransactionalId) - .collect(Collectors.toList()); - FenceProducersOptions fencingOptions = new FenceProducersOptions() - .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> { - if (error != null) - log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName); - Utils.closeQuietly(admin, "Zombie fencing admin for connector " + connName); - }); - } catch (Exception e) { + closeables.register(admin, "zombie fencing admin for " + connName); + + Collection transactionalIds = IntStream.range(0, numTasks) + .mapToObj(i -> new ConnectorTaskId(connName, i)) + .map(this::taskTransactionalId) + .collect(Collectors.toList()); + FenceProducersOptions fencingOptions = new FenceProducersOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + KafkaFuture result = admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> { + if (error != null) + log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName); Utils.closeQuietly(admin, "Zombie fencing admin for connector " + connName); - throw e; - } + }); + closeables.clear(); + return result; } } } @@ -960,9 +957,11 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { private List sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics, - Class connectorClass) { + Class connectorClass, + Closeables closeables) { ArrayList reporters = new ArrayList<>(); LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); + closeables.register(logReporter, "log reporter for task " + id); reporters.add(logReporter); // check if topic for dead letter queue exists @@ -972,7 +971,7 @@ private List sinkTaskReporters(ConnectorTaskId id, SinkConnectorC connectorClientConfigOverridePolicy, kafkaClusterId); Map adminProps = adminConfigs(id.connector(), "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics); - + closeables.register(reporter, "dead letter queue reporter for task " + id); reporters.add(reporter); } @@ -980,12 +979,11 @@ private List sinkTaskReporters(ConnectorTaskId id, SinkConnectorC } private List sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, - ErrorHandlingMetrics errorHandlingMetrics) { - List reporters = new ArrayList<>(); + ErrorHandlingMetrics errorHandlingMetrics, + Closeables closeables) { LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); - reporters.add(logReporter); - - return reporters; + closeables.register(logReporter, "log reporter for task " + id); + return Collections.singletonList(logReporter); } private WorkerErrantRecordReporter createWorkerErrantRecordReporter( @@ -1014,12 +1012,8 @@ private void stopTask(ConnectorTaskId taskId) { if (task instanceof WorkerSourceTask) sourceTaskOffsetCommitter.ifPresent(committer -> committer.remove(task.id())); - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - savedLoader = Plugins.compareAndSwapLoaders(task.loader()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(task.loader())) { task.stop(); - } finally { - Plugins.compareAndSwapLoaders(savedLoader); } } } @@ -1137,31 +1131,22 @@ public void setTargetState(String connName, TargetState state, Callback workerConnector.transitionTo(state, stateChangeCallback), - connectorLoader); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(workerConnector.connector()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + workerConnector.transitionTo(state, stateChangeCallback); + } } for (Map.Entry taskEntry : tasks.entrySet()) { if (taskEntry.getKey().connector().equals(connName)) { WorkerTask workerTask = taskEntry.getValue(); - executeStateTransition(() -> workerTask.transitionTo(state), workerTask.loader); + try (LoaderSwap loaderSwap = plugins.withClassLoader(workerTask.loader)) { + workerTask.transitionTo(state); + } } } } - private void executeStateTransition(Runnable stateTransition, ClassLoader loader) { - ClassLoader savedLoader = plugins.currentThreadLoader(); - try { - savedLoader = Plugins.compareAndSwapLoaders(loader); - stateTransition.run(); - } finally { - Plugins.compareAndSwapLoaders(savedLoader); - } - } - ConnectorStatusMetricsGroup connectorStatusMetricsGroup() { return connectorStatusMetricsGroup; } @@ -1224,7 +1209,7 @@ public TaskBuilder withClassloader(ClassLoader classLoader) { return this; } - public WorkerTask build() { + public WorkerTask build(Closeables closeables) { Objects.requireNonNull(task, "Task cannot be null"); Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); @@ -1233,15 +1218,17 @@ public WorkerTask build() { Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + closeables.register(errorHandlingMetrics, "error handling metrics for task " + id); final Class connectorClass = plugins.connectorClass( connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + closeables.register(retryWithToleranceOperator, "retry-with-tolerance operator for task " + id); retryWithToleranceOperator.metrics(errorHandlingMetrics); return doBuild(task, id, configState, statusListener, initialState, connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, - errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + errorHandlingMetrics, connectorClass, retryWithToleranceOperator, closeables); } abstract WorkerTask doBuild(Task task, @@ -1256,7 +1243,8 @@ abstract WorkerTask doBuild(Task task, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class connectorClass, - RetryWithToleranceOperator retryWithToleranceOperator); + RetryWithToleranceOperator retryWithToleranceOperator, + Closeables closeables); } @@ -1281,19 +1269,22 @@ public WorkerTask doBuild(Task task, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class connectorClass, - RetryWithToleranceOperator retryWithToleranceOperator) { + RetryWithToleranceOperator retryWithToleranceOperator, + Closeables closeables) { TransformationChain transformationChain = new TransformationChain<>(connectorConfig.transformations(), retryWithToleranceOperator); + closeables.register(transformationChain, "transformation chain for task " + id); log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); - retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass, closeables)); WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, keyConverter, valueConverter, headerConverter); Map consumerProps = baseConsumerConfigs( - id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + closeables.register(consumer, "consumer for task " + id); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, headerConverter, transformationChain, consumer, classLoader, time, @@ -1322,17 +1313,19 @@ public WorkerTask doBuild(Task task, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class connectorClass, - RetryWithToleranceOperator retryWithToleranceOperator) { - + RetryWithToleranceOperator retryWithToleranceOperator, + Closeables closeables) { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); - retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics, closeables)); TransformationChain transformationChain = new TransformationChain<>(sourceConfig.transformations(), retryWithToleranceOperator); + closeables.register(transformationChain, "transformation chain for task " + id); log.info("Initializing: {}", transformationChain); Map producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaProducer producer = new KafkaProducer<>(producerProps); + closeables.register(producer, "producer for task " + id); TopicAdmin topicAdmin = null; final boolean topicCreationEnabled = sourceConnectorTopicCreationEnabled(sourceConfig); @@ -1340,6 +1333,7 @@ public WorkerTask doBuild(Task task, Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); topicAdmin = new TopicAdmin(adminOverrides); + closeables.register(topicAdmin, "topic admin for task " + id); } Map topicCreationGroups = topicCreationEnabled @@ -1349,9 +1343,11 @@ public WorkerTask doBuild(Task task, // Set up the offset backing store for this task instance ConnectorOffsetBackingStore offsetStore = offsetStoreForRegularSourceTask( id, sourceConfig, connectorClass, producer, producerProps, topicAdmin); + closeables.register(offsetStore, "offset store for task " + id); offsetStore.configure(config); CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + closeables.register(offsetReader, "offset reader for task " + id); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); // Note we pass the configState as it performs dynamic transformations under the covers @@ -1390,23 +1386,26 @@ public WorkerTask doBuild(Task task, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class connectorClass, - RetryWithToleranceOperator retryWithToleranceOperator) { - + RetryWithToleranceOperator retryWithToleranceOperator, + Closeables closeables) { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); - retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics, closeables)); TransformationChain transformationChain = new TransformationChain<>(sourceConfig.transformations(), retryWithToleranceOperator); + closeables.register(transformationChain, "transformation chain for task " + id); log.info("Initializing: {}", transformationChain); Map producerProps = exactlyOnceSourceTaskProducerConfigs( id, config, sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaProducer producer = new KafkaProducer<>(producerProps); + closeables.register(producer, "producer for task " + id); // Create a topic admin that the task will use for its offsets topic and, potentially, automatic topic creation Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); TopicAdmin topicAdmin = new TopicAdmin(adminOverrides); + closeables.register(topicAdmin, "topic admin for task " + id); Map topicCreationGroups = sourceConnectorTopicCreationEnabled(sourceConfig) ? TopicCreationGroup.configuredGroups(sourceConfig) @@ -1415,16 +1414,20 @@ public WorkerTask doBuild(Task task, // Set up the offset backing store for this task instance ConnectorOffsetBackingStore offsetStore = offsetStoreForExactlyOnceSourceTask( id, sourceConfig, connectorClass, producer, producerProps, topicAdmin); + closeables.register(offsetStore, "offset store for task " + id); offsetStore.configure(config); CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + closeables.register(offsetReader, "offset reader for task " + id); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); // Note we pass the configState as it performs dynamic transformations under the covers - return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + ExactlyOnceWorkerSourceTask result = new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, retryWithToleranceOperator, herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck); + closeables.clear(); + return result; } } @@ -1446,36 +1449,104 @@ ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector( && config.connectorOffsetsTopicsPermitted(); if (usesConnectorSpecificStore) { - Map consumerProps = regularSourceOffsetsConsumerConfigs( + try (Closeables closeables = new Closeables()) { + Map consumerProps = regularSourceOffsetsConsumerConfigs( connName, "connector-consumer-" + connName, config, sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + closeables.register(consumer, "consumer for " + connName); + + Map adminOverrides = adminConfigs(connName, "connector-adminclient-" + connName, config, + sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + + TopicAdmin admin = new TopicAdmin(adminOverrides); + closeables.register(admin, "topic admin for connector " + connName); + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, admin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, if the user has explicitly configured the connector with a connector-specific offsets topic + // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that + // would prevent users from being able to customize the config properties used for the Kafka clients that + // access the offsets topic, and we would not be able to establish reasonable defaults like setting + // isolation.level=read_committed for the offsets topic consumer for this connector + ConnectorOffsetBackingStore result; + if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, producerProps)) { + result = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector(connName), + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } else { + result = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector(connName), + globalOffsetBackingStore, + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } + closeables.clear(); + return result; + } + } else { + return ConnectorOffsetBackingStore.withOnlyWorkerStore( + () -> LoggingContext.forConnector(connName), + globalOffsetBackingStore, + config.offsetsTopic() + ); + } + } + + // Visible for testing + ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector( + SourceConnectorConfig sourceConfig, + String connName, + Connector connector + ) { + try (Closeables closeables = new Closeables()) { + String connectorSpecificOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + + Map producerProps = baseProducerConfigs(connName, "connector-producer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); + + Map consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( + connName, "connector-consumer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + closeables.register(consumer, "consumer for " + connName); Map adminOverrides = adminConfigs(connName, "connector-adminclient-" + connName, config, sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); - TopicAdmin admin = new TopicAdmin(adminOverrides); + closeables.register(admin, "topic admin for " + connName); + KafkaOffsetBackingStore connectorStore = KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, admin); // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct // an offset store that has a primary and a secondary store which both read from that same topic. - // So, if the user has explicitly configured the connector with a connector-specific offsets topic - // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // So, even if the user has explicitly configured the connector with a connector-specific offsets topic, + // if we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global // offset store and build a store backed exclusively by a connector-specific offsets store. // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that // would prevent users from being able to customize the config properties used for the Kafka clients that - // access the offsets topic, and we would not be able to establish reasonable defaults like setting - // isolation.level=read_committed for the offsets topic consumer for this connector + // access the offsets topic, and may lead to confusion for them when tasks are created for the connector + // since they will all have their own dedicated offsets stores anyways + ConnectorOffsetBackingStore result; if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, producerProps)) { - return ConnectorOffsetBackingStore.withOnlyConnectorStore( + result = ConnectorOffsetBackingStore.withOnlyConnectorStore( () -> LoggingContext.forConnector(connName), connectorStore, connectorSpecificOffsetsTopic, admin ); } else { - return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + result = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( () -> LoggingContext.forConnector(connName), globalOffsetBackingStore, connectorStore, @@ -1483,67 +1554,75 @@ ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector( admin ); } - } else { - return ConnectorOffsetBackingStore.withOnlyWorkerStore( - () -> LoggingContext.forConnector(connName), - globalOffsetBackingStore, - config.offsetsTopic() - ); + closeables.clear(); + return result; } } // Visible for testing - ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector( + ConnectorOffsetBackingStore offsetStoreForRegularSourceTask( + ConnectorTaskId id, SourceConnectorConfig sourceConfig, - String connName, - Connector connector + Class connectorClass, + Producer producer, + Map producerProps, + TopicAdmin topicAdmin ) { - String connectorSpecificOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic(); - Map producerProps = baseProducerConfigs(connName, "connector-producer-" + connName, config, sourceConfig, connector.getClass(), - connectorClientConfigOverridePolicy, kafkaClusterId); + if (regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConfig)) { + try (Closeables closeables = new Closeables()) { + Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when configured to use their own offsets topic"); - Map consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( - connName, "connector-consumer-" + connName, config, sourceConfig, connector.getClass(), - connectorClientConfigOverridePolicy, kafkaClusterId); - KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); - - Map adminOverrides = adminConfigs(connName, "connector-adminclient-" + connName, config, - sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); - - TopicAdmin admin = new TopicAdmin(adminOverrides); - KafkaOffsetBackingStore connectorStore = - KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, admin); - - // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct - // an offset store that has a primary and a secondary store which both read from that same topic. - // So, even if the user has explicitly configured the connector with a connector-specific offsets topic, - // if we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global - // offset store and build a store backed exclusively by a connector-specific offsets store. - // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that - // would prevent users from being able to customize the config properties used for the Kafka clients that - // access the offsets topic, and may lead to confusion for them when tasks are created for the connector - // since they will all have their own dedicated offsets stores anyways - if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, producerProps)) { - return ConnectorOffsetBackingStore.withOnlyConnectorStore( - () -> LoggingContext.forConnector(connName), - connectorStore, - connectorSpecificOffsetsTopic, - admin - ); + Map consumerProps = regularSourceOffsetsConsumerConfigs( + id.connector(), "connector-consumer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + closeables.register(consumer, "consumer for task " + id); + + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forTask(sourceConfig.offsetsTopic(), producer, consumer, topicAdmin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, if the user has (implicitly or explicitly) configured the connector with a connector-specific offsets topic + // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that + // would prevent users from being able to customize the config properties used for the Kafka clients that + // access the offsets topic, and we would not be able to establish reasonable defaults like setting + // isolation.level=read_committed for the offsets topic consumer for this task + ConnectorOffsetBackingStore result; + if (sameOffsetTopicAsWorker(sourceConfig.offsetsTopic(), producerProps)) { + result = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forTask(id), + connectorStore, + connectorSpecificOffsetsTopic, + topicAdmin + ); + } else { + result = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + connectorStore, + connectorSpecificOffsetsTopic, + topicAdmin + ); + } + closeables.clear(); + return result; + } } else { - return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( - () -> LoggingContext.forConnector(connName), + return ConnectorOffsetBackingStore.withOnlyWorkerStore( + () -> LoggingContext.forTask(id), globalOffsetBackingStore, - connectorStore, - connectorSpecificOffsetsTopic, - admin + config.offsetsTopic() ); } } // Visible for testing - ConnectorOffsetBackingStore offsetStoreForRegularSourceTask( + ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask( ConnectorTaskId id, SourceConnectorConfig sourceConfig, Class connectorClass, @@ -1551,97 +1630,47 @@ ConnectorOffsetBackingStore offsetStoreForRegularSourceTask( Map producerProps, TopicAdmin topicAdmin ) { - String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic(); + try (Closeables closeables = new Closeables()) { + Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when exactly-once support is enabled"); - if (regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConfig)) { - Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when configured to use their own offsets topic"); - - Map consumerProps = regularSourceOffsetsConsumerConfigs( + Map consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( id.connector(), "connector-consumer-" + id, config, sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + closeables.register(consumer, "consumer for task " + id); + + String connectorOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); KafkaOffsetBackingStore connectorStore = - KafkaOffsetBackingStore.forTask(sourceConfig.offsetsTopic(), producer, consumer, topicAdmin); + KafkaOffsetBackingStore.forTask(connectorOffsetsTopic, producer, consumer, topicAdmin); // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct // an offset store that has a primary and a secondary store which both read from that same topic. // So, if the user has (implicitly or explicitly) configured the connector with a connector-specific offsets topic // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global // offset store and build a store backed exclusively by a connector-specific offsets store. - // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that - // would prevent users from being able to customize the config properties used for the Kafka clients that - // access the offsets topic, and we would not be able to establish reasonable defaults like setting - // isolation.level=read_committed for the offsets topic consumer for this task - if (sameOffsetTopicAsWorker(sourceConfig.offsetsTopic(), producerProps)) { - return ConnectorOffsetBackingStore.withOnlyConnectorStore( + // We cannot under any circumstances build an offset store backed exclusively by the worker-global offset store + // as that would prevent us from being able to write source records and source offset information for the task + // with the same producer, and therefore, in the same transaction. + ConnectorOffsetBackingStore result; + if (sameOffsetTopicAsWorker(connectorOffsetsTopic, producerProps)) { + result = ConnectorOffsetBackingStore.withOnlyConnectorStore( () -> LoggingContext.forTask(id), connectorStore, - connectorSpecificOffsetsTopic, + connectorOffsetsTopic, topicAdmin ); } else { - return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + result = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( () -> LoggingContext.forTask(id), globalOffsetBackingStore, connectorStore, - connectorSpecificOffsetsTopic, + connectorOffsetsTopic, topicAdmin ); } - } else { - return ConnectorOffsetBackingStore.withOnlyWorkerStore( - () -> LoggingContext.forTask(id), - globalOffsetBackingStore, - config.offsetsTopic() - ); - } - } - - // Visible for testing - ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask( - ConnectorTaskId id, - SourceConnectorConfig sourceConfig, - Class connectorClass, - Producer producer, - Map producerProps, - TopicAdmin topicAdmin - ) { - Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when exactly-once support is enabled"); - - Map consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( - id.connector(), "connector-consumer-" + id, config, sourceConfig, connectorClass, - connectorClientConfigOverridePolicy, kafkaClusterId); - KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); - - String connectorOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); - - KafkaOffsetBackingStore connectorStore = - KafkaOffsetBackingStore.forTask(connectorOffsetsTopic, producer, consumer, topicAdmin); - - // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct - // an offset store that has a primary and a secondary store which both read from that same topic. - // So, if the user has (implicitly or explicitly) configured the connector with a connector-specific offsets topic - // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global - // offset store and build a store backed exclusively by a connector-specific offsets store. - // We cannot under any circumstances build an offset store backed exclusively by the worker-global offset store - // as that would prevent us from being able to write source records and source offset information for the task - // with the same producer, and therefore, in the same transaction. - if (sameOffsetTopicAsWorker(connectorOffsetsTopic, producerProps)) { - return ConnectorOffsetBackingStore.withOnlyConnectorStore( - () -> LoggingContext.forTask(id), - connectorStore, - connectorOffsetsTopic, - topicAdmin - ); - } else { - return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( - () -> LoggingContext.forTask(id), - globalOffsetBackingStore, - connectorStore, - connectorOffsetsTopic, - topicAdmin - ); + closeables.clear(); + return result; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 01303e308d7ee..198729d70a35c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -164,7 +164,7 @@ public void stop() { } @Override - protected void close() { + protected void doClose() { // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // passed in try { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index ea086199aae23..10bb6980fa7e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -49,7 +49,7 @@ * if the task fails at the same time. To protect from these cases, we synchronize status updates * using the WorkerTask's monitor. */ -abstract class WorkerTask implements Runnable { +abstract class WorkerTask implements Runnable, AutoCloseable { private static final Logger log = LoggerFactory.getLogger(WorkerTask.class); private static final String THREAD_NAME_PREFIX = "task-thread-"; @@ -153,7 +153,7 @@ public void removeMetrics() { protected abstract void execute(); - protected abstract void close(); + protected abstract void doClose(); protected boolean isStopping() { return stopping; @@ -163,9 +163,10 @@ protected boolean isCancelled() { return cancelled; } - private void doClose() { + @Override + public void close() { try { - close(); + doClose(); } catch (Throwable t) { log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, t); throw t; @@ -197,7 +198,7 @@ private void doRun() throws InterruptedException { throw t; } } finally { - doClose(); + close(); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 4781136c2f880..a45c16b1839de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.util.Closeables; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,8 +95,13 @@ public static DeadLetterQueueReporter createAndSetup(Map adminPr } } - KafkaProducer dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics); + try (Closeables closeables = new Closeables()) { + KafkaProducer dlqProducer = new KafkaProducer<>(producerProps); + closeables.register(dlqProducer, "dead letter queue producer for task " + id); + DeadLetterQueueReporter result = new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics); + closeables.clear(); + return result; + } } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java index 419bea97f1200..e96832e3403bc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java @@ -23,16 +23,20 @@ import org.apache.kafka.connect.runtime.ConnectMetrics; import org.apache.kafka.connect.runtime.ConnectMetricsRegistry; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Contains various sensors used for monitoring errors. */ -public class ErrorHandlingMetrics { +public class ErrorHandlingMetrics implements AutoCloseable { private final Time time = new SystemTime(); private final ConnectMetrics.MetricGroup metricGroup; + private static final Logger log = LoggerFactory.getLogger(ErrorHandlingMetrics.class); + // metrics private final Sensor recordProcessingFailures; private final Sensor recordProcessingErrors; @@ -138,4 +142,13 @@ public void recordErrorTimestamp() { public ConnectMetrics.MetricGroup metricGroup() { return metricGroup; } + + /** + * Close the task Error metrics group when the task is closed + */ + @Override + public void close() { + log.debug("Removing error handling metrics of group {}", metricGroup.groupId()); + metricGroup.close(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java index b33315b9f3cad..e6d369c97cdee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java @@ -43,7 +43,7 @@ * An {@link OffsetBackingStore} with support for reading from and writing to a worker-global * offset backing store and/or a connector-specific offset backing store. */ -public class ConnectorOffsetBackingStore implements OffsetBackingStore { +public class ConnectorOffsetBackingStore implements OffsetBackingStore, AutoCloseable { private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); @@ -187,6 +187,14 @@ public void stop() { connectorStoreAdmin.ifPresent(TopicAdmin::close); } + /** + * Synonym for {@link #stop()}, for use with try-with-resources blocks. + */ + @Override + public void close() { + stop(); + } + /** * Get the offset values for the specified keys. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Closeables.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Closeables.java new file mode 100644 index 0000000000000..0a19863a8fdc5 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Closeables.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Objects; + +/** + * This class allows developers to easily track multiple {@link AutoCloseable} objects allocated + * at different times and, if necessary {@link AutoCloseable#close} them. + *

+ * Users should create an instance in a try-with-resources block, + * {@link #register(AutoCloseable, String) register} resources that they allocate inside that + * block, and optionally {@link #clear() clear} those resources before exiting the block if they + * will be used outside it. + *

+ * For example: + *

 {@code
+ * try (Closeables closeables = new Closeables()) {
+ *      // Switch to the connector's classloader if something goes wrong and we have to close the
+ *      // resources we've allocated for it
+ *      closeables.useLoader(connectorClassLoader);
+ *
+ *     Converter keyConverter = createConverter(KEY_CONVERTER_CONFIG, connectorConfig);
+ *     // Close the key converter if any of the next steps fails
+ *     closeables.register(keyConverter, "task key converter");
+ *
+ *     Converter valueConverter = createConverter(VALUE_CONVERTER_CONFIG, connectorConfig);
+ *     // Close the value converter if any of the next steps fails
+ *     closeables.register(valueConverter, "task value converter);
+ *
+ *     HeaderConverter headerConverter = createHeaderConverter(connectorConfig);
+ *     // Close the header converter if any of the next steps fails
+ *     closeables.register(headerConverter, "task header converter);
+ *
+ *     WorkerTask workerTask = createWorkerTask(keyConverter, valueConverter, headerConverter);
+ *
+ *     // We've successfully created our task; clear our closeables since we want to keep using
+ *     // these
+ *     closeables.clear();
+ * }
+ * }
+ * + * Thread safety: this class is not thread-safe and is only intended to be accessed from a single + * thread per instance. + */ +public class Closeables implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(Closeables.class); + + private final Map closeables; + private ClassLoader loader; + + public Closeables() { + closeables = new IdentityHashMap<>(); + } + + /** + * Register a resource to be {@link AutoCloseable#close() closed} when this {@link Closeables} + * object is {@link #close() closed}. + * @param closeable the closeable resource to track; if null, will be silently ignored + * @param name a description of the closeable resource to use for logging; may not be null + */ + public void register(AutoCloseable closeable, String name) { + Objects.requireNonNull(name, "name may not be null"); + if (closeable == null) { + log.trace("Ignoring null closeable: {}", name); + } else { + log.trace("Registering closeable {}: {}", name, closeable); + this.closeables.put(closeable, name); + } + } + + /** + * Set a {@link ClassLoader} to switch to when {@link AutoCloseable#close() closing} + * resources that have been {@link #register(AutoCloseable, String) registered}. + *

+ * May not be invoked more than once. + * @param loader the loader to use; may not be null + * @throws IllegalStateException if invoked more than once + */ + public void useLoader(ClassLoader loader) { + if (this.loader != null) { + throw new IllegalStateException("May only define classloader once"); + } + Objects.requireNonNull(loader, "class loader may not be null"); + this.loader = loader; + } + + /** + * Forget any resources that have been {@link #register(AutoCloseable, String) registered} + * before this call. Note that if a {@link ClassLoader} has been set via + * {@link #useLoader(ClassLoader)}, it will remain set, and subsequent calls to + * {@link #useLoader(ClassLoader)} will still fail. + */ + public void clear() { + closeables.clear(); + } + + /** + * {@link AutoCloseable#close() Close} all resources that have been + * {@link #register(AutoCloseable, String) registered}, using the {@link ClassLoader} set via + * {@link #useLoader(ClassLoader)} (if one has been set), except those that have been forgotten + * by a call to {@link #clear()}. + *

+ * If any call to {@link AutoCloseable#close()} fails, the exception is caught, logged, and not + * propagated to the caller. + */ + @Override + public void close() { + if (closeables.isEmpty()) + return; + + try (Utils.UncheckedCloseable loaderSwap = maybeSwapLoaders()) { + closeables.forEach(Utils::closeQuietly); + closeables.clear(); + } + } + + private Utils.UncheckedCloseable maybeSwapLoaders() { + if (loader != null) { + return new LoaderSwap(loader)::close; + } else { + return () -> { }; + } + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 0af14cc7f30ec..f34b175ae7e4b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -18,8 +18,8 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.ConnectException; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index d4184bbab2a52..41a6212c35d0d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -248,7 +248,7 @@ public void testSinkTasksCloseErrorReporters() throws Exception { workerSinkTask.initialize(TASK_CONFIG); workerSinkTask.initializeAndStart(); - workerSinkTask.close(); + workerSinkTask.doClose(); PowerMock.verifyAll(); } @@ -271,7 +271,7 @@ public void testSourceTasksCloseErrorReporters() { PowerMock.replayAll(); workerSourceTask.initialize(TASK_CONFIG); - workerSourceTask.close(); + workerSourceTask.doClose(); PowerMock.verifyAll(); } @@ -299,7 +299,7 @@ public void testCloseErrorReportersExceptionPropagation() { PowerMock.replayAll(); workerSourceTask.initialize(TASK_CONFIG); - workerSourceTask.close(); + workerSourceTask.doClose(); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestCloseables.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestCloseables.java new file mode 100644 index 0000000000000..8532c96be65a7 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestCloseables.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.util.Closeables; + +import java.util.HashMap; +import java.util.Map; + +public class TestCloseables extends Closeables { + + private final Map, Integer> registered; + + private ClassLoader loader; + private boolean cleared; + + public TestCloseables() { + super(); + this.registered = new HashMap<>(); + } + + @Override + public void register(AutoCloseable closeable, String description) { + super.register(closeable, description); + this.registered.compute(closeable.getClass(), (c, numRegistered) -> numRegistered == null ? 1 : numRegistered + 1); + } + + @Override + public void useLoader(ClassLoader loader) { + super.useLoader(loader); + this.loader = loader; + } + + @Override + public void clear() { + super.clear(); + this.cleared = true; + } + + public Map, Integer> registered() { + return registered; + } + + public boolean cleared() { + return cleared; + } + + public ClassLoader loader() { + return loader; + } + +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 41d4de5e0a86e..336a1f3626cfb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -356,7 +356,7 @@ public void testShutdown() throws Exception { sinkTaskContext.getValue().requestCommit(); // Force an offset commit workerTask.iteration(); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index eef6ca82be63e..a8ac2a0667aaa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -175,7 +175,7 @@ public void testPollsInBackground() throws Exception { workerTask.iteration(); } workerTask.stop(); - workerTask.close(); + workerTask.doClose(); // Verify contents match expected values, i.e. that they were translated properly. With max // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches @@ -224,7 +224,7 @@ public void testCommit() throws Exception { // Commit finishes synchronously for testing so we can check this immediately assertEquals(0, workerTask.commitFailures()); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); assertEquals(2, capturedRecords.getValues().size()); @@ -265,7 +265,7 @@ public void testCommitFailure() throws Exception { assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -307,7 +307,7 @@ public void testCommitSuccessFollowedByFailure() throws Exception { assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -340,7 +340,7 @@ public void testCommitConsumerFailure() throws Exception { assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -377,7 +377,7 @@ public void testCommitTimeout() throws Exception { assertEquals(1, workerTask.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -439,7 +439,7 @@ public void testAssignmentPauseResume() throws Exception { workerTask.iteration(); workerTask.iteration(); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -477,7 +477,7 @@ public void testRewind() throws Exception { workerTask.iteration(); workerTask.iteration(); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } @@ -503,7 +503,7 @@ public void testRewindOnRebalanceDuringPoll() throws Exception { workerTask.iteration(); workerTask.iteration(); workerTask.stop(); - workerTask.close(); + workerTask.doClose(); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 890c0f7399c5a..f45880ed3cf4a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -94,7 +94,7 @@ public void standardStartup() { .addMockedMethod("initialize") .addMockedMethod("initializeAndStart") .addMockedMethod("execute") - .addMockedMethod("close") + .addMockedMethod("doClose") .createStrictMock(); workerTask.initialize(TASK_CONFIG); @@ -109,7 +109,7 @@ public void standardStartup() { statusListener.onStartup(taskId); expectLastCall(); - workerTask.close(); + workerTask.doClose(); expectLastCall(); statusListener.onShutdown(taskId); @@ -144,13 +144,13 @@ public void stopBeforeStarting() { retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") - .addMockedMethod("close") + .addMockedMethod("doClose") .createStrictMock(); workerTask.initialize(TASK_CONFIG); EasyMock.expectLastCall(); - workerTask.close(); + workerTask.doClose(); EasyMock.expectLastCall(); replay(workerTask); @@ -185,7 +185,7 @@ public void cancelBeforeStopping() throws Exception { .addMockedMethod("initialize") .addMockedMethod("initializeAndStart") .addMockedMethod("execute") - .addMockedMethod("close") + .addMockedMethod("doClose") .createStrictMock(); final CountDownLatch stopped = new CountDownLatch(1); @@ -211,7 +211,7 @@ public void cancelBeforeStopping() throws Exception { statusListener.onStartup(taskId); expectLastCall(); - workerTask.close(); + workerTask.doClose(); expectLastCall(); // there should be no call to onShutdown() diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index a064e296b223c..7f7a8a64d0f2f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.admin.FenceProducersResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.MetricName; @@ -42,6 +44,9 @@ import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.LogReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -58,6 +63,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -130,6 +136,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -304,7 +311,7 @@ public void testStartAndStopConnector() throws Throwable { connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); // Create - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader); when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector); @@ -350,8 +357,6 @@ public void testStartAndStopConnector() throws Throwable { worker.stop(); assertStatistics(worker, 0, 0); - - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(delegatingLoader).connectorLoader(connectorClass); verify(plugins).newConnector(connectorClass); @@ -386,7 +391,7 @@ public void testStartConnectorFailure() throws Exception { Exception exception = new ConnectException("Failed to find Connector"); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(nonConnectorClass)).thenReturn(delegatingLoader); pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(delegatingLoader); @@ -418,7 +423,6 @@ public void testStartConnectorFailure() throws Exception { assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 1, 1, 0, 0); - verify(plugins).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(plugins).delegatingLoader(); verify(delegatingLoader).connectorLoader(nonConnectorClass); @@ -432,7 +436,7 @@ public void testStartConnectorFailure() throws Exception { public void testAddConnectorByAlias() throws Throwable { final String connectorAlias = "SampleSourceConnector"; - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(plugins.newConnector(connectorAlias)).thenReturn(sinkConnector); when(delegatingLoader.connectorLoader(connectorAlias)).thenReturn(pluginLoader); @@ -469,7 +473,6 @@ public void testAddConnectorByAlias() throws Throwable { assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 1, 0, 0, 0); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(plugins).newConnector(connectorAlias); verify(delegatingLoader).connectorLoader(connectorAlias); @@ -489,7 +492,7 @@ public void testAddConnectorByAlias() throws Throwable { public void testAddConnectorByShortAlias() throws Throwable { final String shortConnectorAlias = "WorkerTest"; - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(plugins.newConnector(shortConnectorAlias)).thenReturn(sinkConnector); when(delegatingLoader.connectorLoader(shortConnectorAlias)).thenReturn(pluginLoader); @@ -520,7 +523,6 @@ public void testAddConnectorByShortAlias() throws Throwable { worker.stop(); assertStatistics(worker, 0, 0); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(plugins).newConnector(shortConnectorAlias); verify(sinkConnector, times(2)).version(); @@ -550,7 +552,7 @@ public void testStopInvalidConnector() { public void testReconfigureConnectorTasks() throws Throwable { final String connectorClass = SampleSourceConnector.class.getName(); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader); when(plugins.newConnector(connectorClass)).thenReturn(sinkConnector); @@ -612,7 +614,6 @@ public void testReconfigureConnectorTasks() throws Throwable { worker.stop(); assertStatistics(worker, 0, 0); - verify(plugins, times(3)).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(delegatingLoader).connectorLoader(connectorClass); verify(plugins).newConnector(connectorClass); @@ -632,7 +633,7 @@ public void testReconfigureConnectorTasks() throws Throwable { @Test public void testAddRemoveSourceTask() { - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(SampleSourceConnector.class.getName())).thenReturn(pluginLoader); @@ -665,7 +666,6 @@ public void testAddRemoveSourceTask() { worker.stop(); assertStatistics(worker, 0, 0); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).newTask(TestSourceTask.class); verify(task).version(); verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); @@ -685,7 +685,7 @@ public void testAddRemoveSourceTask() { @Test public void testAddRemoveSinkTask() { // Most of the other cases use source tasks; we make sure to get code coverage for sink tasks here as well - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(SampleSinkConnector.class.getName())).thenReturn(pluginLoader); @@ -723,7 +723,6 @@ public void testAddRemoveSinkTask() { worker.stop(); assertStatistics(worker, 0, 0); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).newTask(TestSinkTask.class); verify(task).version(); verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); @@ -759,9 +758,9 @@ public void testAddRemoveExactlyOnceSourceTask() { workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); config = new DistributedConfig(workerProps); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(SampleSourceConnector.class.getName())).thenReturn(pluginLoader); + expectLoaderSwap(); when(plugins.newTask(TestSourceTask.class)).thenReturn(task); when(task.version()).thenReturn("1.0"); @@ -795,7 +794,6 @@ public void testAddRemoveExactlyOnceSourceTask() { worker.stop(); assertStatistics(worker, 0, 0); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).newTask(TestSourceTask.class); verify(task).version(); verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); @@ -818,8 +816,7 @@ public void testTaskStatusMetricsStatuses() { mockStorage(); mockFileConfigProvider(); - - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -908,7 +905,6 @@ public void testTaskStatusMetricsStatuses() { verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG); verifyTaskHeaderConverter(); - verify(plugins, times(2)).currentThreadLoader(); } @Test @@ -948,7 +944,7 @@ public void testStartTaskFailure() { Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath"); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(SampleSourceConnector.class.getName())).thenReturn(pluginLoader); @@ -979,7 +975,7 @@ public void testCleanupTasksOnStop() { mockStorage(); mockFileConfigProvider(); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); when(plugins.newTask(TestSourceTask.class)).thenReturn(task); when(task.version()).thenReturn("1.0"); @@ -1023,7 +1019,6 @@ public void testCleanupTasksOnStop() { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); verify(plugins).newTask(TestSourceTask.class); - verify(plugins, times(2)).currentThreadLoader(); verify(plugins).delegatingLoader(); verify(delegatingLoader).connectorLoader(SampleSourceConnector.class.getName()); @@ -1044,7 +1039,7 @@ public void testConverterOverrides() { mockStorage(); mockFileConfigProvider(); - when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + expectLoaderSwap(); Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); TaskConfig taskConfig = new TaskConfig(origProps); @@ -1071,7 +1066,6 @@ public void testConverterOverrides() { pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); worker.herder = herder; @@ -1096,7 +1090,6 @@ public void testConverterOverrides() { WorkerSourceTask instantiatedTask = sourceTaskMockedConstruction.constructed().get(0); verify(instantiatedTask).initialize(taskConfig); verify(executorService).submit(any(WorkerSourceTask.class)); - verify(plugins).delegatingLoader(); verify(delegatingLoader).connectorLoader(SampleSourceConnector.class.getName()); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); @@ -1107,7 +1100,6 @@ public void testConverterOverrides() { verify(instantiatedTask).awaitStop(anyLong()); verify(instantiatedTask).removeMetrics(); - verify(plugins, times(2)).currentThreadLoader(); verifyStorage(); } @@ -2018,4 +2010,128 @@ public void stop() { } + @Test + public void testCloseableResourcesTrackedWhileInstantiatingSourceTask() { + ClusterConfigState configState = mock(ClusterConfigState.class); + Map connProps = anyConnectorConfigMap(); + WorkerMetricsGroup.TaskStatusListener statusListener = mock(WorkerMetricsGroup.TaskStatusListener.class); + + expectLoaderSwap(); + doReturn(SampleSourceConnector.class).when(plugins).connectorClass(SampleSourceConnector.class.getName()); + when(plugins.delegatingLoader()).thenReturn(delegatingLoader); + when(delegatingLoader.connectorLoader(anyString())).thenReturn(pluginLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); + when(plugins.newTask(any())).thenReturn(task); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); + mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + + Map taskProps = new HashMap<>(connProps); + taskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + + worker = new Worker( + WORKER_ID, + new MockTime(), + plugins, + config, + offsetBackingStore, + noneConnectorClientConfigOverridePolicy + ); + worker.herder = herder; + worker.start(); + + TestCloseables closeables = new TestCloseables(); + assertTrue("Failed to start task", worker.startTask( + TASK_ID, connProps, taskProps, statusListener, closeables, + worker.new SourceTaskBuilder(TASK_ID, configState, statusListener, TargetState.STARTED) + )); + + Map, Integer> expectedCloseableResources = new HashMap<>(); + expectedCloseableResources.put(OffsetStorageReaderImpl.class, 1); + if (enableTopicCreation) { + expectedCloseableResources.put(TopicAdmin.class, 1); + } + expectedCloseableResources.put(RetryWithToleranceOperator.class, 1); + expectedCloseableResources.put(KafkaProducer.class, 1); + expectedCloseableResources.put(TransformationChain.class, 1); + expectedCloseableResources.put(ErrorHandlingMetrics.class, 1); + expectedCloseableResources.put(LogReporter.class, 1); + expectedCloseableResources.put(ConnectorOffsetBackingStore.class, 1); + expectedCloseableResources.put(taskHeaderConverter.getClass(), 1); + + expectedCloseableResources.forEach((klass, count) -> + assertEquals( + "class " + klass + " was not registered the expected number of times", + count, + closeables.registered().remove(klass) + ) + ); + assertEquals("one anonymous lambda/method reference should also have been registered", 1, closeables.registered().size()); + } + + @Test + public void testCloseableResourcesTrackedWhileInstantiatingSinkTask() { + if (enableTopicCreation) { + return; + } + + ClusterConfigState configState = mock(ClusterConfigState.class); + Map connProps = anyConnectorConfigMap(); + connProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName()); + WorkerMetricsGroup.TaskStatusListener statusListener = mock(WorkerMetricsGroup.TaskStatusListener.class); + + expectLoaderSwap(); + doReturn(SampleSinkConnector.class).when(plugins).connectorClass(SampleSinkConnector.class.getName()); + when(plugins.delegatingLoader()).thenReturn(delegatingLoader); + when(delegatingLoader.connectorLoader(anyString())).thenReturn(pluginLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); + when(plugins.newTask(any())).thenReturn(new TestSinkTask()); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); + mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + + Map taskProps = new HashMap<>(connProps); + taskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName()); + + worker = new Worker( + WORKER_ID, + new MockTime(), + plugins, + config, + offsetBackingStore, + noneConnectorClientConfigOverridePolicy + ); + worker.herder = herder; + worker.start(); + + TestCloseables closeables = new TestCloseables(); + assertTrue("Failed to start task", worker.startTask( + TASK_ID, connProps, taskProps, statusListener, closeables, + worker.new SinkTaskBuilder(TASK_ID, configState, statusListener, TargetState.STARTED) + )); + + Map, Integer> expectedCloseableResources = new HashMap<>(); + expectedCloseableResources.put(TransformationChain.class, 1); + expectedCloseableResources.put(RetryWithToleranceOperator.class, 1); + expectedCloseableResources.put(KafkaConsumer.class, 1); + expectedCloseableResources.put(ErrorHandlingMetrics.class, 1); + expectedCloseableResources.put(LogReporter.class, 1); + expectedCloseableResources.put(taskHeaderConverter.getClass(), 1); + + expectedCloseableResources.forEach((klass, count) -> + assertEquals( + "class " + klass + " was not registered the expected number of times", + count, + closeables.registered().remove(klass) + ) + ); + assertEquals("one anonymous lambda/method reference should also have been registered", 1, closeables.registered().size()); + } + + private void expectLoaderSwap() { + doCallRealMethod().when(plugins).withClassLoader(any()); + } + }