Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ddprof-lib/src/main/cpp/os_dd.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ class OS : public ::OS {
static void uninstallTlsPrimeSignalHandler(int signal_num);
static void enumerateThreadIds(const std::function<void(int)>& callback);
static void signalThread(int tid, int signum);
static bool startThreadDirectoryWatcher(const std::function<void(int)>& on_new_thread, const std::function<void(int)>& on_dead_thread);
static int getThreadCount();
static void stopThreadDirectoryWatcher();
};
}
#endif // _OS_DD_H
172 changes: 1 addition & 171 deletions ddprof-lib/src/main/cpp/os_linux_dd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <sys/inotify.h>
#include <dirent.h>
#include <errno.h>
#include <pthread.h>
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <memory>

#ifndef __musl__
#include <malloc.h>
Expand All @@ -25,43 +22,6 @@
#define MMAP_SYSCALL __NR_mmap2
#endif

// Thread directory watcher state
static std::atomic<bool> g_watcher_running{false};
static std::atomic<int> g_watcher_fd{-1};
static pthread_t g_watcher_thread;
static std::atomic<bool> g_watcher_thread_created{false};
static std::function<void(int)> g_on_new_thread;
static std::function<void(int)> g_on_dead_thread;

static void* threadDirectoryWatcherLoop(void* arg);

// Fork handler to reset watcher state in child process
static void resetWatcherStateInChild() {
// After fork(), child process doesn't have the watcher thread
// Reset all state to prevent deadlock when child tries to cleanup
g_watcher_running.store(false);
g_watcher_thread_created.store(false);

// Close the inherited fd in child to prevent issues
int fd = g_watcher_fd.exchange(-1);
if (fd >= 0) {
close(fd);
}

// Clear callback functions to prevent accidental invocation
g_on_new_thread = nullptr;
g_on_dead_thread = nullptr;
}

// Register fork handler on first use
static void ensureForkHandlerRegistered() {
static bool registered = false;
if (!registered) {
pthread_atfork(nullptr, nullptr, resetWatcherStateInChild);
registered = true;
}
}

int ddprof::OS::truncateFile(int fd) {
int rslt = ftruncate(fd, 0);
if (rslt == 0) {
Expand Down Expand Up @@ -176,134 +136,4 @@ int ddprof::OS::getThreadCount() {
return thread_count;
}

bool ddprof::OS::startThreadDirectoryWatcher(const std::function<void(int)>& on_new_thread, const std::function<void(int)>& on_dead_thread) {
// Ensure fork handler is registered to prevent deadlock in child processes
ensureForkHandlerRegistered();

if (g_watcher_running.load()) {
return true; // Already running
}

int inotify_fd = inotify_init1(IN_CLOEXEC | IN_NONBLOCK);
if (inotify_fd == -1) {
TEST_LOG("Failed to initialize inotify: %s", strerror(errno));
return false;
}

int watch_fd = inotify_add_watch(inotify_fd, "/proc/self/task", IN_CREATE | IN_DELETE | IN_MOVED_FROM | IN_MOVED_TO);
if (watch_fd == -1) {
TEST_LOG("Failed to add inotify watch on /proc/self/task: %s", strerror(errno));
close(inotify_fd);
return false;
}

g_on_new_thread = on_new_thread;
g_on_dead_thread = on_dead_thread;
g_watcher_fd.store(inotify_fd);
g_watcher_running.store(true);

if (pthread_create(&g_watcher_thread, nullptr, threadDirectoryWatcherLoop, nullptr) != 0) {
TEST_LOG("Failed to create thread directory watcher thread: %s", strerror(errno));
g_watcher_running.store(false);
g_watcher_fd.store(-1);
close(inotify_fd);
return false;
}

g_watcher_thread_created.store(true);
TEST_LOG("Started thread directory watcher (thread will be joined on cleanup)");
return true;
}

void ddprof::OS::stopThreadDirectoryWatcher() {
if (!g_watcher_running.load()) {
return;
}

TEST_LOG("Stopping thread directory watcher...");

// Signal the watcher thread to stop
g_watcher_running.store(false);

// Close the inotify fd to wake up select()
int fd = g_watcher_fd.exchange(-1);
if (fd >= 0) {
close(fd);
}

// Wait for the watcher thread to actually terminate
if (g_watcher_thread_created.load()) {
TEST_LOG("Waiting for watcher thread to terminate...");
void* retval;
int join_result = pthread_join(g_watcher_thread, &retval);
if (join_result != 0) {
TEST_LOG("Failed to join watcher thread: %s", strerror(join_result));
} else {
TEST_LOG("Watcher thread terminated successfully");
}
g_watcher_thread_created.store(false);
}

TEST_LOG("Thread directory watcher stopped");
}

static void* threadDirectoryWatcherLoop(void* arg) {
const int fd = g_watcher_fd.load();
if (fd < 0) return nullptr;

char buffer[4096];
fd_set readfds;
struct timeval timeout;

while (g_watcher_running.load()) {
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
timeout.tv_sec = 1;
timeout.tv_usec = 0;

int ret = select(fd + 1, &readfds, nullptr, nullptr, &timeout);
if (ret < 0) {
if (errno != EINTR) {
TEST_LOG("Thread directory watcher select failed: %s", strerror(errno));
break;
}
continue;
}

if (ret == 0) continue; // Timeout, check running flag

ssize_t len = read(fd, buffer, sizeof(buffer));
if (len <= 0) {
if (len < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
TEST_LOG("Thread directory watcher read failed: %s", strerror(errno));
break;
}
continue;
}

// Parse inotify events
for (ssize_t i = 0; i < len;) {
struct inotify_event *event = (struct inotify_event *)(buffer + i);

if (event->mask & IN_Q_OVERFLOW) {
TEST_LOG("Thread directory watcher queue overflow, triggering full rescan");
// TODO: Trigger full rescan callback
} else if (event->len > 0 && event->name[0] >= '1' && event->name[0] <= '9') {
int tid = atoi(event->name);
if (tid > 0) {
if (event->mask & (IN_CREATE | IN_MOVED_TO)) {
if (g_on_new_thread) g_on_new_thread(tid);
} else if (event->mask & (IN_DELETE | IN_MOVED_FROM)) {
if (g_on_dead_thread) g_on_dead_thread(tid);
}
}
}

i += sizeof(struct inotify_event) + event->len;
}
}

return nullptr;
}

#endif // __linux__
16 changes: 4 additions & 12 deletions ddprof-lib/src/main/cpp/os_macos_dd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,28 @@ void ddprof::OS::enumerateThreadIds(const std::function<void(int)>& callback) {
void ddprof::OS::signalThread(int tid, int signum) {
// On macOS, tid is actually a mach thread port
thread_t thread = static_cast<thread_t>(tid);

// Convert mach thread to pthread for signaling
// This is a limitation - we can't easily signal arbitrary mach threads
// In practice, this is mainly used for TLS priming which is disabled on macOS
TEST_LOG("Thread signaling not fully supported on macOS (thread=%d, signal=%d)", tid, signum);
}

bool ddprof::OS::startThreadDirectoryWatcher(const std::function<void(int)>& on_new_thread, const std::function<void(int)>& on_dead_thread) {
return false; // Thread directory watching not supported on macOS
}

int ddprof::OS::getThreadCount() {
task_t task = mach_task_self();
thread_act_array_t thread_list;
mach_msg_type_number_t thread_count;

kern_return_t kr = task_threads(task, &thread_list, &thread_count);
if (kr != KERN_SUCCESS) {
TEST_LOG("Failed to get thread count: %d", kr);
return 0;
}

// Clean up
vm_deallocate(task, (vm_address_t)thread_list, thread_count * sizeof(thread_t));

return static_cast<int>(thread_count);
}

void ddprof::OS::stopThreadDirectoryWatcher() {
// No-op on macOS
return static_cast<int>(thread_count);
}

#endif // __APPLE__
28 changes: 2 additions & 26 deletions ddprof-lib/src/main/cpp/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ ProfiledThread::initCurrentThreadWithBuffer() {
static void resetTlsPrimingStateInChild() {
// After fork(), reset signal number to prevent cleanup attempts
g_tls_prime_signal = -1;

// Note: The watcher state is reset by os_linux_dd.cpp fork handler
// This just ensures we don't try to uninstall signals or cleanup resources
}

// Register fork handler on first initialization
Expand Down Expand Up @@ -140,27 +137,10 @@ void ProfiledThread::doInitExistingThreads() {

TEST_LOG("Successfully installed TLS priming handler on RT signal %d", g_tls_prime_signal);

// Use a modest buffer size since we're only handling new threads via watcher
// 256 should be more than enough for concurrent new thread creation
// Use a modest buffer size for concurrent thread TLS initialization
// 256 should be more than enough for typical workloads
prepareBuffer(256);

// Start thread directory watcher to prime new threads (no mass-priming of existing threads)
bool watcher_started = ddprof::OS::startThreadDirectoryWatcher(
[](int tid) {
// Prime new thread with TLS signal
ddprof::OS::signalThread(tid, g_tls_prime_signal);
},
[](int tid) {
// No-op for dead threads - cleanup handled elsewhere
}
);

if (!watcher_started) {
TEST_LOG("Failed to start thread directory watcher for TLS priming");
} else {
TEST_LOG("Started thread directory watcher for TLS priming");
}

initialized = true;
}

Expand All @@ -169,10 +149,6 @@ void ProfiledThread::cleanupTlsPriming() {
return;
}

// Stop the thread directory watcher
ddprof::OS::stopThreadDirectoryWatcher();
TEST_LOG("Stopped thread directory watcher");

// Uninstall the TLS priming signal handler
if (g_tls_prime_signal > 0) {
ddprof::OS::uninstallTlsPrimeSignalHandler(g_tls_prime_signal);
Expand Down
51 changes: 6 additions & 45 deletions ddprof-lib/src/test/cpp/test_tlsPriming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,30 @@ TEST_F(TlsPrimingTest, GetThreadCount) {

TEST_F(TlsPrimingTest, SignalCurrentThread) {
int signal_num = ddprof::OS::installTlsPrimeSignalHandler(testTlsSignalHandler, 6);

#ifdef __linux__
if (signal_num > 0) {
TEST_LOG("Signaling current thread with signal %d", signal_num);

// Get the first thread ID from enumeration
std::atomic<int> first_tid{-1};
ddprof::OS::enumerateThreadIds([&](int tid) {
if (first_tid.load() == -1) {
first_tid.store(tid);
}
});

int tid = first_tid.load();
if (tid >= 0) {
ddprof::OS::signalThread(tid, signal_num);

// Wait a bit for signal to be delivered
std::this_thread::sleep_for(std::chrono::milliseconds(100));

EXPECT_GT(g_signal_received.load(), 0);
EXPECT_GT(g_threads_primed.load(), 0);
EXPECT_EQ(g_test_tls, 0x1234ABCD);

TEST_LOG("Signal delivered successfully, TLS primed");
} else {
TEST_LOG("No threads found for signaling");
Expand All @@ -124,45 +124,6 @@ TEST_F(TlsPrimingTest, SignalCurrentThread) {
#endif
}

TEST_F(TlsPrimingTest, ThreadDirectoryWatcher) {
std::atomic<int> new_threads{0};
std::atomic<int> dead_threads{0};

bool started = ddprof::OS::startThreadDirectoryWatcher(
[&](int tid) {
TEST_LOG("New thread detected: %d", tid);
new_threads++;
},
[&](int tid) {
TEST_LOG("Thread died: %d", tid);
dead_threads++;
}
);

if (started) {
TEST_LOG("Thread directory watcher started successfully");

// Create a short-lived thread to trigger the watcher
std::thread test_thread([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});

test_thread.join();

// Wait for watcher to detect changes
std::this_thread::sleep_for(std::chrono::milliseconds(200));

ddprof::OS::stopThreadDirectoryWatcher();
TEST_LOG("Thread directory watcher stopped");

// We might see events, but it's not guaranteed due to timing
TEST_LOG("Detected %d new threads, %d dead threads",
new_threads.load(), dead_threads.load());
} else {
TEST_LOG("Thread directory watcher not supported on this platform");
}
}

// Test TLS cleanup for JVMTI-allocated threads (non-buffer)
TEST_F(TlsPrimingTest, JvmtiThreadCleanup) {
TEST_LOG("Testing JVMTI-allocated thread cleanup");
Expand Down
Loading
Loading