Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<suppress checks="ParameterNumber"
files="Worker(SinkTask|SourceTask|Coordinator).java"/>
<suppress checks="ParameterNumber"
files="ConfigKeyInfo.java"/>
files="(ConfigKeyInfo|Worker).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void removeMetrics() {
}

@Override
protected void close() {
protected void doClose() {
if (started) {
Utils.closeQuietly(task::stop, "source task");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -273,13 +274,14 @@ public boolean includeRecordDetailsInErrorLog() {
public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);

final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try (Closeables closeables = new Closeables()) {
final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";

try {
@SuppressWarnings("unchecked")
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
closeables.register(transformation, "transformation " + alias + " for connector " + getString(NAME_CONFIG));
Map<String, Object> configs = originalsWithPrefix(prefix);
Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
Expand All @@ -288,17 +290,16 @@ public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
@SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
closeables.register(predicate, "predicate " + predicateAlias + " for connector " + getString(NAME_CONFIG));
predicate.configure(originalsWithPrefix(predicatePrefix));
transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
} else {
transformations.add(transformation);
}
} catch (Exception e) {
throw new ConnectException(e);
}
closeables.clear();
return transformations;
}

return transformations;
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void stop() {
}

@Override
protected void close() {
protected void doClose() {
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
// passed in
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* if the task fails at the same time. To protect from these cases, we synchronize status updates
* using the WorkerTask's monitor.
*/
abstract class WorkerTask implements Runnable {
abstract class WorkerTask implements Runnable, AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
private static final String THREAD_NAME_PREFIX = "task-thread-";

Expand Down Expand Up @@ -153,7 +153,7 @@ public void removeMetrics() {

protected abstract void execute();

protected abstract void close();
protected abstract void doClose();

protected boolean isStopping() {
return stopping;
Expand All @@ -163,9 +163,10 @@ protected boolean isCancelled() {
return cancelled;
}

private void doClose() {
@Override
public void close() {
try {
close();
doClose();
} catch (Throwable t) {
log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, t);
throw t;
Expand Down Expand Up @@ -197,7 +198,7 @@ private void doRun() throws InterruptedException {
throw t;
}
} finally {
doClose();
close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.util.Closeables;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,8 +95,13 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
}
}

KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics);
try (Closeables closeables = new Closeables()) {
KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
closeables.register(dlqProducer, "dead letter queue producer for task " + id);
DeadLetterQueueReporter result = new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics);
closeables.clear();
return result;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains various sensors used for monitoring errors.
*/
public class ErrorHandlingMetrics {
public class ErrorHandlingMetrics implements AutoCloseable {

private final Time time = new SystemTime();

private final ConnectMetrics.MetricGroup metricGroup;

private static final Logger log = LoggerFactory.getLogger(ErrorHandlingMetrics.class);

// metrics
private final Sensor recordProcessingFailures;
private final Sensor recordProcessingErrors;
Expand Down Expand Up @@ -138,4 +142,13 @@ public void recordErrorTimestamp() {
public ConnectMetrics.MetricGroup metricGroup() {
return metricGroup;
}

/**
* Close the task Error metrics group when the task is closed
*/
@Override
public void close() {
log.debug("Removing error handling metrics of group {}", metricGroup.groupId());
metricGroup.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* An {@link OffsetBackingStore} with support for reading from and writing to a worker-global
* offset backing store and/or a connector-specific offset backing store.
*/
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
public class ConnectorOffsetBackingStore implements OffsetBackingStore, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);

Expand Down Expand Up @@ -187,6 +187,14 @@ public void stop() {
connectorStoreAdmin.ifPresent(TopicAdmin::close);
}

/**
* Synonym for {@link #stop()}, for use with try-with-resources blocks.
*/
@Override
public void close() {
stop();
}

/**
* Get the offset values for the specified keys.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Objects;

/**
* This class allows developers to easily track multiple {@link AutoCloseable} objects allocated
* at different times and, if necessary {@link AutoCloseable#close} them.
* <p>
* Users should create an instance in a try-with-resources block,
* {@link #register(AutoCloseable, String) register} resources that they allocate inside that
* block, and optionally {@link #clear() clear} those resources before exiting the block if they
* will be used outside it.
* <p>
* For example:
* <pre> {@code
* try (Closeables closeables = new Closeables()) {
* // Switch to the connector's classloader if something goes wrong and we have to close the
* // resources we've allocated for it
* closeables.useLoader(connectorClassLoader);
*
* Converter keyConverter = createConverter(KEY_CONVERTER_CONFIG, connectorConfig);
* // Close the key converter if any of the next steps fails
* closeables.register(keyConverter, "task key converter");
*
* Converter valueConverter = createConverter(VALUE_CONVERTER_CONFIG, connectorConfig);
* // Close the value converter if any of the next steps fails
* closeables.register(valueConverter, "task value converter);
*
* HeaderConverter headerConverter = createHeaderConverter(connectorConfig);
* // Close the header converter if any of the next steps fails
* closeables.register(headerConverter, "task header converter);
*
* WorkerTask workerTask = createWorkerTask(keyConverter, valueConverter, headerConverter);
*
* // We've successfully created our task; clear our closeables since we want to keep using
* // these
* closeables.clear();
* }
* }</pre>
*
* Thread safety: this class is not thread-safe and is only intended to be accessed from a single
* thread per instance.
*/
public class Closeables implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(Closeables.class);

private final Map<AutoCloseable, String> closeables;
private ClassLoader loader;

public Closeables() {
closeables = new IdentityHashMap<>();
}

/**
* Register a resource to be {@link AutoCloseable#close() closed} when this {@link Closeables}
* object is {@link #close() closed}.
* @param closeable the closeable resource to track; if null, will be silently ignored
* @param name a description of the closeable resource to use for logging; may not be null
*/
public void register(AutoCloseable closeable, String name) {
Objects.requireNonNull(name, "name may not be null");
if (closeable == null) {
log.trace("Ignoring null closeable: {}", name);
} else {
log.trace("Registering closeable {}: {}", name, closeable);
this.closeables.put(closeable, name);
}
}

/**
* Set a {@link ClassLoader} to switch to when {@link AutoCloseable#close() closing}
* resources that have been {@link #register(AutoCloseable, String) registered}.
* <p>
* May not be invoked more than once.
* @param loader the loader to use; may not be null
* @throws IllegalStateException if invoked more than once
*/
public void useLoader(ClassLoader loader) {
if (this.loader != null) {
throw new IllegalStateException("May only define classloader once");
}
Objects.requireNonNull(loader, "class loader may not be null");
this.loader = loader;
}

/**
* Forget any resources that have been {@link #register(AutoCloseable, String) registered}
* before this call. Note that if a {@link ClassLoader} has been set via
* {@link #useLoader(ClassLoader)}, it will remain set, and subsequent calls to
* {@link #useLoader(ClassLoader)} will still fail.
*/
public void clear() {
closeables.clear();
}

/**
* {@link AutoCloseable#close() Close} all resources that have been
* {@link #register(AutoCloseable, String) registered}, using the {@link ClassLoader} set via
* {@link #useLoader(ClassLoader)} (if one has been set), except those that have been forgotten
* by a call to {@link #clear()}.
* <p>
* If any call to {@link AutoCloseable#close()} fails, the exception is caught, logged, and not
* propagated to the caller.
*/
@Override
public void close() {
if (closeables.isEmpty())
return;

try (Utils.UncheckedCloseable loaderSwap = maybeSwapLoaders()) {
closeables.forEach(Utils::closeQuietly);
closeables.clear();
}
}

private Utils.UncheckedCloseable maybeSwapLoaders() {
if (loader != null) {
return new LoaderSwap(loader)::close;
} else {
return () -> { };
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void testSinkTasksCloseErrorReporters() throws Exception {

workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
workerSinkTask.doClose();

PowerMock.verifyAll();
}
Expand All @@ -271,7 +271,7 @@ public void testSourceTasksCloseErrorReporters() {
PowerMock.replayAll();

workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
workerSourceTask.doClose();

PowerMock.verifyAll();
}
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testCloseErrorReportersExceptionPropagation() {
PowerMock.replayAll();

workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
workerSourceTask.doClose();

PowerMock.verifyAll();
}
Expand Down
Loading