From 510d68f1f16e0f31ad90dade500f64ee09f9bcdf Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 26 Apr 2024 14:15:03 +0200 Subject: [PATCH 1/3] Implemented socket opening and closing --- .../UniversalProfilingIntegration.java | 48 +++++++++++- .../UniversalProfilingIntegrationTest.java | 78 ++++++++++++++++++- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index 251aeaf6df..d6ce765503 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -33,6 +33,10 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Random; public class UniversalProfilingIntegration { @@ -43,6 +47,9 @@ public class UniversalProfilingIntegration { // Visible for testing volatile boolean isActive = false; + // Visible for testing + String socketPath = null; + private ActivationListener activationListener = new ActivationListener() { @Override @@ -70,21 +77,32 @@ public void start(ElasticApmTracer tracer) { try { log.debug("Starting universal profiling correlation"); + socketPath = openProfilerSocket(config.getSocketDir()); + CoreConfiguration coreConfig = tracer.getConfig(CoreConfiguration.class); ByteBuffer processCorrelationStorage = ProfilerSharedMemoryWriter.generateProcessCorrelationStorage( - coreConfig.getServiceName(), coreConfig.getEnvironment(), ""); + coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath); UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage); isActive = true; tracer.registerSpanListener(activationListener); } catch (Exception e) { log.error("Failed to start universal profiling integration", e); + if (socketPath != null) { + try { + UniversalProfilingCorrelation.stopProfilerReturnChannel(); + socketPath = null; + } catch (Exception e2) { + log.error("Failed to clean up universal profiling integration socket", e2); + } + } } } public void stop() { try { if (isActive) { + UniversalProfilingCorrelation.stopProfilerReturnChannel(); JvmtiAccess.destroy(); } } catch (Exception e) { @@ -114,4 +132,32 @@ public void correlateAndReport(Transaction endedTransaction) { public void drop(Transaction endedTransaction) { //TODO: remove dropped transactions from correlation storage without reporting } + + + private String openProfilerSocket(String socketDir) { + Path dir = Paths.get(socketDir); + if (!Files.exists(dir) && !dir.toFile().mkdirs()) { + throw new IllegalArgumentException("Could not create directory '" + socketDir + "'"); + } + Path socketFile; + do { + socketFile = dir.resolve(randomSocketFileName()); + } while (Files.exists(socketFile)); + + String absolutePath = socketFile.toAbsolutePath().toString(); + log.debug("Opening profiler correlation socket {}", absolutePath); + UniversalProfilingCorrelation.startProfilerReturnChannel(absolutePath); + return absolutePath; + } + + private String randomSocketFileName() { + StringBuilder name = new StringBuilder("essock"); + String allowedChars = "abcdefghijklmonpqrstuvwxzyABCDEFGHIJKLMONPQRSTUVWXYZ0123456789"; + Random rnd = new Random(); + for (int i = 0; i < 8; i++) { + int idx = rnd.nextInt(allowedChars.length()); + name.append(allowedChars.charAt(idx)); + } + return name.toString(); + } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java index 867efae359..81b64c67cb 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.stagemonitor.configuration.ConfigurationRegistry; @@ -46,6 +47,9 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.function.Consumer; import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat; @@ -62,6 +66,16 @@ public class UniversalProfilingIntegrationTest { private TestObjectPoolFactory poolFactory; + @TempDir + Path tempDir; + + void setupTracer() { + setupTracer(config -> { + UniversalProfilingConfiguration conf = config.getConfig(UniversalProfilingConfiguration.class); + doReturn(true).when(conf).isEnabled(); + doReturn(tempDir.toAbsolutePath().toString()).when(conf).getSocketDir(); + }); + } void setupTracer(Consumer configCustomizer) { ConfigurationRegistry config = SpyConfiguration.createSpyConfig(); configCustomizer.accept(config); @@ -96,6 +110,7 @@ public void ensureDisabledOnWindows() { verify(mockTracer, never()).registerSpanListener(any()); assertThat(universalProfilingIntegration.isActive).isFalse(); + assertThat(universalProfilingIntegration.socketPath).isNull(); } @Test @@ -107,6 +122,7 @@ public void ensureDisabledByDefault() { verify(mockTracer, never()).registerSpanListener(any()); assertThat(universalProfilingIntegration.isActive).isFalse(); + assertThat(universalProfilingIntegration.socketPath).isNull(); } @Nested @@ -115,8 +131,7 @@ class SharedMemory { @Test public void testNestedActivations() { - setupTracer(conf -> doReturn(true) - .when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled()); + setupTracer(); Transaction first = tracer.startRootTransaction(null); Transaction second = tracer.startRootTransaction(null); @@ -168,7 +183,9 @@ public void testNestedActivations() { @ValueSource(strings = {"my nameßspace", ""}) public void testProcessStoragePopulated(String environment) { setupTracer(conf -> { - doReturn(true).when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled(); + UniversalProfilingConfiguration profConfig = conf.getConfig(UniversalProfilingConfiguration.class); + doReturn(true).when(profConfig).isEnabled(); + doReturn(tempDir.toAbsolutePath().toString()).when(profConfig).getSocketDir(); CoreConfiguration core = conf.getConfig(CoreConfiguration.class); doReturn("service Ä 1").when(core).getServiceName(); doReturn(environment).when(core).getEnvironment(); @@ -179,7 +196,8 @@ public void testProcessStoragePopulated(String environment) { assertThat(buffer.getChar()).describedAs("layout-minor-version").isEqualTo((char) 1); assertThat(readUtf8Str(buffer)).isEqualTo("service Ä 1"); assertThat(readUtf8Str(buffer)).isEqualTo(environment); - assertThat(readUtf8Str(buffer)).describedAs("socket-path").isEqualTo(""); + assertThat(readUtf8Str(buffer)).describedAs("socket-path") + .startsWith(tempDir.toAbsolutePath().toString() + "/essock"); } private String readUtf8Str(ByteBuffer buffer) { @@ -220,6 +238,58 @@ static void checkTlsIs(@Nullable AbstractSpan span) { } } + @DisabledOnOs(OS.WINDOWS) + @Nested + class SpanCorrelation { + + @Test + void badSocketPath() throws Exception { + Path notADir = tempDir.resolve("not_a_dir"); + Files.createFile(notADir); + String absPath = notADir.toAbsolutePath().toString(); + + ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig(); + UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class); + + doReturn(true).when(config).isEnabled(); + doReturn(absPath).when(config).getSocketDir(); + + UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration(); + ElasticApmTracer mockTracer = MockTracer.create(configRegistry); + + universalProfilingIntegration.start(mockTracer); + + verify(mockTracer, never()).registerSpanListener(any()); + assertThat(universalProfilingIntegration.isActive).isFalse(); + assertThat(universalProfilingIntegration.socketPath).isNull(); + } + + @Test + void socketParentDirCreated() throws Exception { + Path subDirs = tempDir.resolve("create/me"); + String absolute = subDirs.toAbsolutePath().toString(); + + ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig(); + UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class); + + doReturn(true).when(config).isEnabled(); + doReturn(absolute).when(config).getSocketDir(); + + UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration(); + ElasticApmTracer mockTracer = MockTracer.create(configRegistry); + + universalProfilingIntegration.start(mockTracer); + try { + assertThat(Paths.get(universalProfilingIntegration.socketPath)).exists(); + assertThat(universalProfilingIntegration.socketPath).startsWith(absolute + "/"); + } finally { + universalProfilingIntegration.stop(); + } + } + + } + + private static byte[] idToBytes(Id id) { byte[] buff = new byte[32]; int len = id.toBytes(buff, 0); From 98cd851d75544948667e1ac8db6d45c880fe11d7 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 26 Apr 2024 14:56:25 +0200 Subject: [PATCH 2/3] Added executor for consuming messages --- .../UniversalProfilingIntegration.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index d6ce765503..ffa2faa26e 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -28,18 +28,34 @@ import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.sdk.logging.Logger; import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import co.elastic.apm.agent.util.ExecutorUtils; import co.elastic.otel.JvmtiAccess; import co.elastic.otel.UniversalProfilingCorrelation; +import co.elastic.otel.profiler.DecodeException; +import co.elastic.otel.profiler.ProfilerMessage; +import co.elastic.otel.profiler.ProfilerRegistrationMessage; +import co.elastic.otel.profiler.TraceCorrelationMessage; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; +import java.util.Base64; import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; public class UniversalProfilingIntegration { + /** + * The frequency at which the processor polls the unix domain socket for new messages from the + * profiler. + */ + static final long POLL_FREQUENCY_MS = 20; + private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class); private volatile ElasticApmTracer tracer; @@ -50,6 +66,8 @@ public class UniversalProfilingIntegration { // Visible for testing String socketPath = null; + private ScheduledExecutorService executor; + private ActivationListener activationListener = new ActivationListener() { @Override @@ -84,6 +102,14 @@ public void start(ElasticApmTracer tracer) { coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath); UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage); + executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration"); + executor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + periodicTimer(); + } + }, POLL_FREQUENCY_MS, POLL_FREQUENCY_MS, TimeUnit.MILLISECONDS); + isActive = true; tracer.registerSpanListener(activationListener); } catch (Exception e) { @@ -99,8 +125,17 @@ public void start(ElasticApmTracer tracer) { } } + private void periodicTimer() { + consumeProfilerMessages(); + } + public void stop() { try { + if (executor != null) { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + executor = null; + } if (isActive) { UniversalProfilingCorrelation.stopProfilerReturnChannel(); JvmtiAccess.destroy(); @@ -160,4 +195,37 @@ private String randomSocketFileName() { } return name.toString(); } + + private void consumeProfilerMessages() { + try { + while (true) { + try { + ProfilerMessage message = + UniversalProfilingCorrelation.readProfilerReturnChannelMessage(); + if (message == null) { + break; + } else if (message instanceof TraceCorrelationMessage) { + handleMessage((TraceCorrelationMessage) message); + } else if (message instanceof ProfilerRegistrationMessage) { + handleMessage((ProfilerRegistrationMessage) message); + } else { + log.debug("Received unknown message type from profiler: {}", message); + } + } catch (DecodeException e) { + log.warn("Failed to read profiler message", e); + // intentionally no break here, subsequent messages might be decodeable + } + } + } catch (Exception e) { + log.error("Cannot read from profiler socket", e); + } + } + + private void handleMessage(ProfilerRegistrationMessage message) { + //TODO: handle message + } + + private void handleMessage(TraceCorrelationMessage message) { + //TODO: handle message + } } From 76319ad8a0989d9a3f38f86046757257ee2515df Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 2 May 2024 09:39:41 +0200 Subject: [PATCH 3/3] Reset isActive --- .../agent/universalprofiling/UniversalProfilingIntegration.java | 1 + 1 file changed, 1 insertion(+) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index ffa2faa26e..33897cbfa6 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -139,6 +139,7 @@ public void stop() { if (isActive) { UniversalProfilingCorrelation.stopProfilerReturnChannel(); JvmtiAccess.destroy(); + isActive = false; } } catch (Exception e) { log.error("Failed to stop universal profiling integration", e);