From afd9beb05ed5f818da5e50c3aad380c5343786ac Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 7 Jan 2020 15:32:47 +0530 Subject: [PATCH 1/2] KAFKA-9375 Add thread names to kafka connect --- .../kafka/common/utils}/ThreadUtils.java | 2 +- .../kafka/common/utils/ThreadUtilsTest.java | 93 +++++++++++++++++++ .../kafka/connect/mirror/Scheduler.java | 2 +- .../runtime/SourceTaskOffsetCommitter.java | 4 +- .../distributed/DistributedHerder.java | 23 +++-- .../storage/MemoryOffsetBackingStore.java | 4 +- .../distributed/DistributedHerderTest.java | 21 +++++ .../storage/FileOffsetBackingStoreTest.java | 13 +++ .../kafka/trogdor/agent/WorkerManager.java | 2 +- .../trogdor/coordinator/NodeManager.java | 2 +- .../trogdor/coordinator/TaskManager.java | 2 +- .../kafka/trogdor/rest/JsonRestServer.java | 2 +- .../workload/ConnectionStressWorker.java | 2 +- .../trogdor/workload/ConsumeBenchWorker.java | 2 +- .../workload/ExternalCommandWorker.java | 2 +- .../trogdor/workload/ProduceBenchWorker.java | 2 +- .../trogdor/workload/RoundTripWorker.java | 2 +- .../workload/SustainedConnectionWorker.java | 2 +- .../trogdor/common/MiniTrogdorCluster.java | 1 + .../kafka/trogdor/task/SampleTaskWorker.java | 2 +- 20 files changed, 160 insertions(+), 25 deletions(-) rename {tools/src/main/java/org/apache/kafka/trogdor/common => clients/src/main/java/org/apache/kafka/common/utils}/ThreadUtils.java (98%) create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java similarity index 98% rename from tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java rename to clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java index 7e0285648ad34..750c8d7a89f41 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.trogdor.common; +package org.apache.kafka.common.utils; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java new file mode 100644 index 0000000000000..2001a5cc1bd05 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ThreadFactory; + +public class ThreadUtilsTest { + + private static final Runnable EMPTY_RUNNABLE = new Runnable() { + @Override + public void run() { + } + }; + private static final String THREAD_NAME = "ThreadName"; + private static final String THREAD_NAME_WITH_NUMBER = THREAD_NAME + "%d"; + + + @Test + public void testThreadNameWithoutNumberNoDemon() { + Assert.assertEquals(ThreadUtils.createThreadFactory(THREAD_NAME, false). + newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME); + } + + @Test + public void testThreadNameWithoutNumberDemon() { + Thread daemonThread = ThreadUtils.createThreadFactory(THREAD_NAME, true).newThread(EMPTY_RUNNABLE); + try { + Assert.assertEquals(daemonThread.getName(), THREAD_NAME); + Assert.assertTrue(daemonThread.isDaemon()); + } finally { + try { + daemonThread.join(); + } catch (InterruptedException e) { + // can be ignored + e.printStackTrace(); + } + } + } + + @Test + public void testThreadNameWithNumberNoDemon() { + ThreadFactory localThreadFactory = ThreadUtils.createThreadFactory(THREAD_NAME_WITH_NUMBER, false); + Assert.assertEquals(localThreadFactory.newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME + "1"); + Assert.assertEquals(localThreadFactory.newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME + "2"); + } + + @Test + public void testThreadNameWithNumberDemon() { + ThreadFactory localThreadFactory = ThreadUtils.createThreadFactory(THREAD_NAME_WITH_NUMBER, true); + Thread daemonThread1 = localThreadFactory.newThread(EMPTY_RUNNABLE); + Thread daemonThread2 = localThreadFactory.newThread(EMPTY_RUNNABLE); + + try { + Assert.assertEquals(daemonThread1.getName(), THREAD_NAME + "1"); + Assert.assertTrue(daemonThread1.isDaemon()); + } finally { + try { + daemonThread1.join(); + } catch (InterruptedException e) { + // can be ignored + e.printStackTrace(); + } + } + try { + Assert.assertEquals(daemonThread2.getName(), THREAD_NAME + "2"); + Assert.assertTrue(daemonThread2.isDaemon()); + } finally { + try { + daemonThread2.join(); + } catch (InterruptedException e) { + // can be ignored + e.printStackTrace(); + } + } + } +} diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index cac9a80c0fd89..203c01d0a1bb4 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -104,7 +104,7 @@ private void run(Task task, String description) { } private void executeThread(Task task, String description) { - Thread.currentThread().setName(description); + Thread.currentThread().setName(name + "-" + description); if (closed) { log.info("{} skipping task due to shutdown: {}", name, description); return; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java index 8e8d3fa056cfd..012c7074ad900 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,8 @@ class SourceTaskOffsetCommitter { } public SourceTaskOffsetCommitter(WorkerConfig config) { - this(config, Executors.newSingleThreadScheduledExecutor(), + this(config, Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory( + SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", false)), new ConcurrentHashMap>()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index f3861ddd59bf9..d2186f47fb1c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -60,6 +60,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import javax.crypto.KeyGenerator; @@ -80,7 +81,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -227,15 +227,18 @@ public DistributedHerder(DistributedConfig config, : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(time), time, clientId, logContext); - this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque(1), - new ThreadFactory() { - @Override - public Thread newThread(Runnable herder) { - return new Thread(herder, "DistributedHerder-" + clientId); - } - }); - this.forwardRequestExecutor = Executors.newSingleThreadExecutor(); - this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE); + this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque(1), + ThreadUtils.createThreadFactory( + this.getClass().getSimpleName() + "-" + clientId + "-%d", false)); + + this.forwardRequestExecutor = Executors.newFixedThreadPool(1, + ThreadUtils.createThreadFactory( + "ForwardRequestExecutor-" + clientId + "-%d", false)); + this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE, + ThreadUtils.createThreadFactory( + "StartAndStopExecutor-" + clientId + "-%d", false)); this.config = config; stopping = new AtomicBoolean(false); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 72439e7d687b3..ccec12a864b2b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,8 @@ public void configure(WorkerConfig config) { @Override public void start() { - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory( + this.getClass().getSimpleName() + "-%d", false)); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index ddbd560fa5c56..b1204ea926550 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -59,6 +59,7 @@ import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +68,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; import java.util.ArrayList; import java.util.Arrays; @@ -76,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -161,6 +164,11 @@ public class DistributedHerderTest { private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; + private static final Runnable EMPTY_RUNNABLE = new Runnable() { + @Override + public void run() { + } + }; @Mock private ConfigBackingStore configBackingStore; @Mock private StatusBackingStore statusBackingStore; @@ -187,6 +195,7 @@ public class DistributedHerderTest { private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); + @Before public void setUp() throws Exception { time = new MockTime(); @@ -1764,6 +1773,18 @@ public void testInconsistentConfigs() { } + @Test + public void testThreadNames() { + Assert.assertTrue(Whitebox.getInternalState(herder, "herderExecutor"). + getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith(DistributedHerder.class.getSimpleName())); + + Assert.assertTrue(Whitebox.getInternalState(herder, "forwardRequestExecutor"). + getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("ForwardRequestExecutor")); + + Assert.assertTrue(Whitebox.getInternalState(herder, "startAndStopExecutor"). + getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor")); + } + private void expectRebalance(final long offset, final List assignedConnectors, final List assignedTasks) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java index 1fdb91a2dcb2d..cc0ccc2120ef0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.util.Callback; import org.easymock.EasyMock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.powermock.api.easymock.PowerMock; @@ -30,6 +31,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; import static org.junit.Assert.assertEquals; @@ -41,6 +43,11 @@ public class FileOffsetBackingStoreTest { File tempFile; private static Map firstSet = new HashMap<>(); + private static final Runnable EMPTY_RUNNABLE = new Runnable() { + @Override + public void run() { + } + }; static { firstSet.put(buffer("key"), buffer("value")); @@ -100,6 +107,12 @@ public void testSaveRestore() throws Exception { PowerMock.verifyAll(); } + @Test + public void testThreadName() { + Assert.assertTrue(((ThreadPoolExecutor) store.executor).getThreadFactory() + .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); + } + private static ByteBuffer buffer(String v) { return ByteBuffer.wrap(v.getBytes()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index 5ef9299476ca9..4510e1bb9221e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 97ad4ae1506f3..9dc379cde0018 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -43,9 +43,9 @@ package org.apache.kafka.trogdor.coordinator; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.common.Node; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 60e2b1e8e9f95..69d2ddae7637c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -25,12 +25,12 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java index b69b85c42c9ec..d1f1948e2fba1 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -20,8 +20,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.trogdor.common.JsonUtil; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.Handler; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index cef8d2d0d569d..c42869b5fcca5 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -36,11 +36,11 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 1e80209cc9fa2..a397f849ac123 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -28,11 +28,11 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java index 9db7f18e24722..acc195e37ed17 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 0bdb8ba6a208a..770a174af7efa 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -29,10 +29,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 7eab2de6a9349..643d22cd73c6e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -35,11 +35,11 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java index 6ba9916e6d0f0..ddc248fc44454 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java @@ -31,10 +31,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java index 0c9aba2cc9394..1e84d607a5489 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java @@ -18,6 +18,7 @@ package org.apache.kafka.trogdor.common; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.agent.Agent; import org.apache.kafka.trogdor.agent.AgentClient; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java index 404817a708982..06339d2ae4eb5 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java @@ -19,8 +19,8 @@ import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.ThreadUtils; import java.util.concurrent.Executors; import java.util.concurrent.Future; From 54b80f3ae15a2bd3ed92b5fb1d5b6b68938a1657 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Thu, 30 Jan 2020 19:39:44 +0530 Subject: [PATCH 2/2] KAFKA-9375: review comments --- .../kafka/common/utils/ThreadUtilsTest.java | 32 ++++++++----------- .../distributed/DistributedHerderTest.java | 12 +++---- .../storage/FileOffsetBackingStoreTest.java | 9 ++---- 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java index 2001a5cc1bd05..c49447c17d20d 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ThreadUtilsTest.java @@ -16,17 +16,16 @@ */ package org.apache.kafka.common.utils; -import org.junit.Assert; import org.junit.Test; import java.util.concurrent.ThreadFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class ThreadUtilsTest { - private static final Runnable EMPTY_RUNNABLE = new Runnable() { - @Override - public void run() { - } + private static final Runnable EMPTY_RUNNABLE = () -> { }; private static final String THREAD_NAME = "ThreadName"; private static final String THREAD_NAME_WITH_NUMBER = THREAD_NAME + "%d"; @@ -34,22 +33,21 @@ public void run() { @Test public void testThreadNameWithoutNumberNoDemon() { - Assert.assertEquals(ThreadUtils.createThreadFactory(THREAD_NAME, false). - newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME); + assertEquals(THREAD_NAME, ThreadUtils.createThreadFactory(THREAD_NAME, false). + newThread(EMPTY_RUNNABLE).getName()); } @Test public void testThreadNameWithoutNumberDemon() { Thread daemonThread = ThreadUtils.createThreadFactory(THREAD_NAME, true).newThread(EMPTY_RUNNABLE); try { - Assert.assertEquals(daemonThread.getName(), THREAD_NAME); - Assert.assertTrue(daemonThread.isDaemon()); + assertEquals(THREAD_NAME, daemonThread.getName()); + assertTrue(daemonThread.isDaemon()); } finally { try { daemonThread.join(); } catch (InterruptedException e) { // can be ignored - e.printStackTrace(); } } } @@ -57,8 +55,8 @@ public void testThreadNameWithoutNumberDemon() { @Test public void testThreadNameWithNumberNoDemon() { ThreadFactory localThreadFactory = ThreadUtils.createThreadFactory(THREAD_NAME_WITH_NUMBER, false); - Assert.assertEquals(localThreadFactory.newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME + "1"); - Assert.assertEquals(localThreadFactory.newThread(EMPTY_RUNNABLE).getName(), THREAD_NAME + "2"); + assertEquals(THREAD_NAME + "1", localThreadFactory.newThread(EMPTY_RUNNABLE).getName()); + assertEquals(THREAD_NAME + "2", localThreadFactory.newThread(EMPTY_RUNNABLE).getName()); } @Test @@ -68,25 +66,23 @@ public void testThreadNameWithNumberDemon() { Thread daemonThread2 = localThreadFactory.newThread(EMPTY_RUNNABLE); try { - Assert.assertEquals(daemonThread1.getName(), THREAD_NAME + "1"); - Assert.assertTrue(daemonThread1.isDaemon()); + assertEquals(THREAD_NAME + "1", daemonThread1.getName()); + assertTrue(daemonThread1.isDaemon()); } finally { try { daemonThread1.join(); } catch (InterruptedException e) { // can be ignored - e.printStackTrace(); } } try { - Assert.assertEquals(daemonThread2.getName(), THREAD_NAME + "2"); - Assert.assertTrue(daemonThread2.isDaemon()); + assertEquals(THREAD_NAME + "2", daemonThread2.getName()); + assertTrue(daemonThread2.isDaemon()); } finally { try { daemonThread2.join(); } catch (InterruptedException e) { // can be ignored - e.printStackTrace(); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index b1204ea926550..74693f890e2c1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -59,7 +59,6 @@ import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -164,10 +163,7 @@ public class DistributedHerderTest { private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; - private static final Runnable EMPTY_RUNNABLE = new Runnable() { - @Override - public void run() { - } + private static final Runnable EMPTY_RUNNABLE = () -> { }; @Mock private ConfigBackingStore configBackingStore; @@ -1775,13 +1771,13 @@ public void testInconsistentConfigs() { @Test public void testThreadNames() { - Assert.assertTrue(Whitebox.getInternalState(herder, "herderExecutor"). + assertTrue(Whitebox.getInternalState(herder, "herderExecutor"). getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith(DistributedHerder.class.getSimpleName())); - Assert.assertTrue(Whitebox.getInternalState(herder, "forwardRequestExecutor"). + assertTrue(Whitebox.getInternalState(herder, "forwardRequestExecutor"). getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("ForwardRequestExecutor")); - Assert.assertTrue(Whitebox.getInternalState(herder, "startAndStopExecutor"). + assertTrue(Whitebox.getInternalState(herder, "startAndStopExecutor"). getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor")); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java index cc0ccc2120ef0..aac7cb0c3b909 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.connect.util.Callback; import org.easymock.EasyMock; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.powermock.api.easymock.PowerMock; @@ -34,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class FileOffsetBackingStoreTest { @@ -43,10 +43,7 @@ public class FileOffsetBackingStoreTest { File tempFile; private static Map firstSet = new HashMap<>(); - private static final Runnable EMPTY_RUNNABLE = new Runnable() { - @Override - public void run() { - } + private static final Runnable EMPTY_RUNNABLE = () -> { }; static { @@ -109,7 +106,7 @@ public void testSaveRestore() throws Exception { @Test public void testThreadName() { - Assert.assertTrue(((ThreadPoolExecutor) store.executor).getThreadFactory() + assertTrue(((ThreadPoolExecutor) store.executor).getThreadFactory() .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); }