Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.mockito.stubbing.Answer;

/**
Expand Down Expand Up @@ -172,6 +173,10 @@ private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController exe

private static Answer<Future<?>> answerNow() {
return invocationOnMock -> {
// this method executes everything in the caller thread
// this messes up assertions that verify
// that a thread is part of only a threadpool
ThreadRegistry.forceClearRegistrationForTests(Thread.currentThread().getId());

Runnable task = invocationOnMock.getArgument(0);
task.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
@Override
public synchronized void start() {
setDaemon(true);
ThreadRegistry.register("BookieThread", 0);
ThreadRegistry.register("BookieThread", true);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
journalDirectories.stream().map(File::getName).collect(Collectors.joining(", ")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public ForceWriteThread(Thread threadToNotifyOnEx,
@Override
public void run() {
LOG.info("ForceWrite Thread started");
ThreadRegistry.register(super.getName(), 0);
ThreadRegistry.register(super.getName());

if (conf.isBusyWaitEnabled()) {
try {
Expand Down Expand Up @@ -939,7 +939,7 @@ public int getJournalQueueLength() {
@Override
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName, 0);
ThreadRegistry.register(journalThreadName);

if (conf.isBusyWaitEnabled()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ public SyncThread(ServerConfiguration conf,
this.checkpointSource = checkpointSource;
this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName));
this.syncExecutorTime = statsLogger.getThreadScopedCounter("sync-thread-time");
this.executor.submit(() -> ThreadRegistry.register(executorName, 0));
}

@VisibleForTesting
static ScheduledExecutorService newExecutor() {
return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName) {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, executorName), name);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage

private static String dbStoragerExecutorName = "db-storage";
private final ExecutorService executor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory(dbStoragerExecutorName));
new DefaultThreadFactory(dbStoragerExecutorName) {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, dbStoragerExecutorName), name);
}
});

// Executor used to for db index cleanup
private final ScheduledExecutorService cleanupExecutor = Executors
Expand Down Expand Up @@ -215,7 +220,6 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
flushExecutorTime = ledgerIndexDirStatsLogger.getThreadScopedCounter("db-storage-thread-time");

executor.submit(() -> {
ThreadRegistry.register(dbStoragerExecutorName, 0);
// ensure the metric gets registered on start-up as this thread only executes
// when the write cache is full which may not happen or not for a long time
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -122,7 +123,12 @@ class BookieNettyServer {

if (!conf.isDisableServerSocketBind()) {
this.eventLoopGroup = EventLoopUtil.getServerEventLoopGroup(conf,
new DefaultThreadFactory("bookie-io"));
new DefaultThreadFactory("bookie-io") {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, "bookie-id"), name);
}
});
this.acceptorGroup = EventLoopUtil.getServerAcceptorGroup(conf,
new DefaultThreadFactory("bookie-acceptor"));
allChannels = new CleanupChannelGroup(eventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.PortManager;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class LedgerStorageCheckpointTest {

@Before
public void setUp() throws Exception {
ThreadRegistry.clear();
LOG.info("Setting up test {}", getClass());
PowerMockito.mockStatic(Executors.class);

Expand All @@ -116,6 +118,7 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
ThreadRegistry.clear();
LOG.info("TearDown");
Exception tearDownException = null;
// stop zookeeper service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -225,6 +226,11 @@ public void tearDown() throws Exception {
}
}

@After
public void clearMetricsThreadRegistry() throws Exception {
ThreadRegistry.clear();
}

/**
* Start zookeeper cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,52 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* For mapping thread ids to thread pools and threads within those pools
* or just for lone named threads. Thread scoped metrics add labels to
* metrics by retrieving the ThreadPoolThread object from this registry.
* For flexibility, this registry is not based on TLS.
*/
public class ThreadRegistry {
private static Logger logger = LoggerFactory.getLogger(ThreadRegistry.class);
private static ConcurrentMap<Long, ThreadPoolThread> threadPoolMap = new ConcurrentHashMap<>();
private static ConcurrentMap<String, Integer> threadPoolThreadMap = new ConcurrentHashMap<>();

/*
Threads can register themselves as their first act before carrying out
any work. By calling this method, the ThreadPoolThread is incremented
for the given thread pool.
*/
public static void register(String threadPool) {
register(threadPool, false);
}

public static void register(String threadPool, boolean force) {
Integer threadPoolThread = threadPoolThreadMap.compute(threadPool, (k, v) -> v == null ? 0 : v + 1);
if (force) {
threadPoolMap.remove(Thread.currentThread().getId());
}
register(threadPool, threadPoolThread, Thread.currentThread().getId());
}

/**
* In some tests we run in the same thread activities that should
* run in different threads from different thread-pools
* this would trigger assertions to fail.
* This is a convenience method to work around such cases.
* This method shouldn't be used in production code.
*/
public static void forceClearRegistrationForTests(long threadId) {
threadPoolMap.compute(threadId, (id, value) -> {
if (value != null) {
logger.info("Forcibly clearing registry entry {} for thread id {}", value, id);
}
return null;
});
}

/*
Threads can register themselves as their first act before carrying out
Expand All @@ -37,17 +75,30 @@ public static void register(String threadPool, int threadPoolThread) {

/*
Thread factories can register a thread by its id.
The assumption is that one thread belongs only to one threadpool.
The doesn't hold in tests, in which we use mock Executors that
run the code in the same thread as the caller
*/
public static void register(String threadPool, int threadPoolThread, long threadId) {
ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId);
threadPoolMap.put(threadId, tpt);
ThreadPoolThread previous = threadPoolMap.put(threadId, tpt);
if (previous != null) {
throw new IllegalStateException("Thread " + threadId + " was already registered in thread pool "
+ previous.threadPool + " as thread " + previous.ordinal + " with threadId " + previous.threadId
+ " trying to overwrite with " + threadPool + " and ordinal " + threadPoolThread);
}
}

public static Runnable registerThread(Runnable runnable, String threadPool) {
return new RegisteredRunnable(threadPool, runnable);
}

/*
Clears all stored thread state.
*/
public static void clear() {
threadPoolMap.clear();
threadPoolThreadMap.clear();
}

/*
Expand Down Expand Up @@ -79,4 +130,20 @@ public int getOrdinal() {
return ordinal;
}
}

private static class RegisteredRunnable implements Runnable {
private final String threadPool;
private final Runnable runnable;

public RegisteredRunnable(String threadPool, Runnable runnable) {
this.threadPool = threadPool;
this.runnable = runnable;
}

@Override
public void run() {
register(threadPool);
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,9 @@ protected void onRemoval(LocalData value) throws Exception {
}
};
}

@Override
public String toString() {
return "DataSketchesOpStatsLogger{labels=" + labels + ", id=" + System.identityHashCode(this) + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* OpStatsLogger implementation that lazily registers OpStatsLoggers per thread
* with added labels for the threadpool/thresd name and thread no.
*/
public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger {

private static Logger logger = LoggerFactory.getLogger(ThreadScopedDataSketchesStatsLogger.class);

private ThreadLocal<DataSketchesOpStatsLogger> statsLoggers;
private DataSketchesOpStatsLogger defaultStatsLogger;
private Map<String, String> originalLabels;
Expand Down Expand Up @@ -95,16 +99,34 @@ private DataSketchesOpStatsLogger getStatsLogger() {
if (!statsLogger.isThreadInitialized()) {
ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get();
if (tpt == null) {
logger.warn("Thread {} was not registered in the thread registry. Using default stats logger {}.",
Thread.currentThread(), defaultStatsLogger);
statsLoggers.set(defaultStatsLogger);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger);
DataSketchesOpStatsLogger previous = provider.opStats
.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger);
// If we overwrite a logger, metrics will not be collected correctly
if (previous != null && previous != defaultStatsLogger) {
logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger."
+ "New is {}, previous was {}",
defaultStatsLogger, previous);
throw new IllegalStateException("Invalid state. Overwrote a stats logger.");
}
return defaultStatsLogger;
} else {
Map<String, String> threadScopedlabels = new HashMap<>(originalLabels);
threadScopedlabels.put("threadPool", tpt.getThreadPool());
threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal()));

statsLogger.initializeThread(threadScopedlabels);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger);
DataSketchesOpStatsLogger previous = provider.opStats
.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger);
// If we overwrite a logger, metrics will not be collected correctly
if (previous != null && previous != statsLogger) {
logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger."
+ "New is {}, previous was {}",
defaultStatsLogger, previous);
throw new IllegalStateException("Invalid state. Overwrote a stats logger.");
}
}
}

Expand Down