Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.kafka.trogdor.common;
package org.apache.kafka.common.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;

import org.junit.Test;

import java.util.concurrent.ThreadFactory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ThreadUtilsTest {

private static final Runnable EMPTY_RUNNABLE = () -> {
};
private static final String THREAD_NAME = "ThreadName";
private static final String THREAD_NAME_WITH_NUMBER = THREAD_NAME + "%d";


@Test
public void testThreadNameWithoutNumberNoDemon() {
assertEquals(THREAD_NAME, ThreadUtils.createThreadFactory(THREAD_NAME, false).
newThread(EMPTY_RUNNABLE).getName());
}

@Test
public void testThreadNameWithoutNumberDemon() {
Thread daemonThread = ThreadUtils.createThreadFactory(THREAD_NAME, true).newThread(EMPTY_RUNNABLE);
try {
assertEquals(THREAD_NAME, daemonThread.getName());
assertTrue(daemonThread.isDaemon());
} finally {
try {
daemonThread.join();
} catch (InterruptedException e) {
// can be ignored
}
}
}

@Test
public void testThreadNameWithNumberNoDemon() {
ThreadFactory localThreadFactory = ThreadUtils.createThreadFactory(THREAD_NAME_WITH_NUMBER, false);
assertEquals(THREAD_NAME + "1", localThreadFactory.newThread(EMPTY_RUNNABLE).getName());
assertEquals(THREAD_NAME + "2", localThreadFactory.newThread(EMPTY_RUNNABLE).getName());
}

@Test
public void testThreadNameWithNumberDemon() {
ThreadFactory localThreadFactory = ThreadUtils.createThreadFactory(THREAD_NAME_WITH_NUMBER, true);
Thread daemonThread1 = localThreadFactory.newThread(EMPTY_RUNNABLE);
Thread daemonThread2 = localThreadFactory.newThread(EMPTY_RUNNABLE);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need daemonThread2 in this test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to check if 2 daemonThreads are appropriately numbered.


try {
assertEquals(THREAD_NAME + "1", daemonThread1.getName());
assertTrue(daemonThread1.isDaemon());
} finally {
try {
daemonThread1.join();
} catch (InterruptedException e) {
// can be ignored
}
}
try {
assertEquals(THREAD_NAME + "2", daemonThread2.getName());
assertTrue(daemonThread2.isDaemon());
} finally {
try {
daemonThread2.join();
} catch (InterruptedException e) {
// can be ignored
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void run(Task task, String description) {
}

private void executeThread(Task task, String description) {
Thread.currentThread().setName(description);
Thread.currentThread().setName(name + "-" + description);
if (closed) {
log.info("{} skipping task due to shutdown: {}", name, description);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,7 +61,8 @@ class SourceTaskOffsetCommitter {
}

public SourceTaskOffsetCommitter(WorkerConfig config) {
this(config, Executors.newSingleThreadScheduledExecutor(),
this(config, Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(
SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", false)),
new ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;

import javax.crypto.KeyGenerator;
Expand All @@ -80,7 +81,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -227,15 +227,18 @@ public DistributedHerder(DistributedConfig config,
: new WorkerGroupMember(config, restUrl, this.configBackingStore,
new RebalanceListener(time), time, clientId, logContext);

this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(1),
new ThreadFactory() {
@Override
public Thread newThread(Runnable herder) {
return new Thread(herder, "DistributedHerder-" + clientId);
}
});
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(1),
ThreadUtils.createThreadFactory(
this.getClass().getSimpleName() + "-" + clientId + "-%d", false));

this.forwardRequestExecutor = Executors.newFixedThreadPool(1,
ThreadUtils.createThreadFactory(
"ForwardRequestExecutor-" + clientId + "-%d", false));
this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE,
ThreadUtils.createThreadFactory(
"StartAndStopExecutor-" + clientId + "-%d", false));
this.config = config;

stopping = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,7 +54,8 @@ public void configure(WorkerConfig config) {

@Override
public void start() {
executor = Executors.newSingleThreadExecutor();
executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory(
this.getClass().getSimpleName() + "-%d", false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -76,6 +77,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -161,6 +163,8 @@ public class DistributedHerderTest {

private static final String WORKER_ID = "localhost:8083";
private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
private static final Runnable EMPTY_RUNNABLE = () -> {
};

@Mock private ConfigBackingStore configBackingStore;
@Mock private StatusBackingStore statusBackingStore;
Expand All @@ -187,6 +191,7 @@ public class DistributedHerderTest {
private final ConnectorClientConfigOverridePolicy
noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();


@Before
public void setUp() throws Exception {
time = new MockTime();
Expand Down Expand Up @@ -1764,6 +1769,18 @@ public void testInconsistentConfigs() {
}


@Test
public void testThreadNames() {
assertTrue(Whitebox.<ThreadPoolExecutor>getInternalState(herder, "herderExecutor").
getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith(DistributedHerder.class.getSimpleName()));

assertTrue(Whitebox.<ThreadPoolExecutor>getInternalState(herder, "forwardRequestExecutor").
getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("ForwardRequestExecutor"));

assertTrue(Whitebox.<ThreadPoolExecutor>getInternalState(herder, "startAndStopExecutor").
getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
}

private void expectRebalance(final long offset,
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class FileOffsetBackingStoreTest {

Expand All @@ -41,6 +43,8 @@ public class FileOffsetBackingStoreTest {
File tempFile;

private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
private static final Runnable EMPTY_RUNNABLE = () -> {
};

static {
firstSet.put(buffer("key"), buffer("value"));
Expand Down Expand Up @@ -100,6 +104,12 @@ public void testSaveRestore() throws Exception {
PowerMock.verifyAll();
}

@Test
public void testThreadName() {
assertTrue(((ThreadPoolExecutor) store.executor).getThreadFactory()
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
}

private static ByteBuffer buffer(String v) {
return ByteBuffer.wrap(v.getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@

package org.apache.kafka.trogdor.coordinator;

import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.Handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
Expand Down
Loading