Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,46 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.util.ExecutorUtils;
import co.elastic.otel.JvmtiAccess;
import co.elastic.otel.UniversalProfilingCorrelation;
import co.elastic.otel.profiler.DecodeException;
import co.elastic.otel.profiler.ProfilerMessage;
import co.elastic.otel.profiler.ProfilerRegistrationMessage;
import co.elastic.otel.profiler.TraceCorrelationMessage;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Base64;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public class UniversalProfilingIntegration {

/**
* The frequency at which the processor polls the unix domain socket for new messages from the
* profiler.
*/
static final long POLL_FREQUENCY_MS = 20;

private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class);

private volatile ElasticApmTracer tracer;

// Visible for testing
volatile boolean isActive = false;

// Visible for testing
String socketPath = null;

private ScheduledExecutorService executor;

private ActivationListener activationListener = new ActivationListener() {

@Override
Expand Down Expand Up @@ -70,22 +95,51 @@ public void start(ElasticApmTracer tracer) {
try {
log.debug("Starting universal profiling correlation");

socketPath = openProfilerSocket(config.getSocketDir());

CoreConfiguration coreConfig = tracer.getConfig(CoreConfiguration.class);
ByteBuffer processCorrelationStorage = ProfilerSharedMemoryWriter.generateProcessCorrelationStorage(
coreConfig.getServiceName(), coreConfig.getEnvironment(), "");
coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath);
UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage);

executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration");
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
periodicTimer();
}
}, POLL_FREQUENCY_MS, POLL_FREQUENCY_MS, TimeUnit.MILLISECONDS);

isActive = true;
tracer.registerSpanListener(activationListener);
} catch (Exception e) {
log.error("Failed to start universal profiling integration", e);
if (socketPath != null) {
try {
UniversalProfilingCorrelation.stopProfilerReturnChannel();
socketPath = null;
} catch (Exception e2) {
log.error("Failed to clean up universal profiling integration socket", e2);
}
}
}
}

private void periodicTimer() {
consumeProfilerMessages();
}

public void stop() {
try {
if (executor != null) {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
executor = null;
}
if (isActive) {
UniversalProfilingCorrelation.stopProfilerReturnChannel();
JvmtiAccess.destroy();
isActive = false;
}
} catch (Exception e) {
log.error("Failed to stop universal profiling integration", e);
Expand Down Expand Up @@ -114,4 +168,65 @@ public void correlateAndReport(Transaction endedTransaction) {
public void drop(Transaction endedTransaction) {
//TODO: remove dropped transactions from correlation storage without reporting
}


private String openProfilerSocket(String socketDir) {
Path dir = Paths.get(socketDir);
if (!Files.exists(dir) && !dir.toFile().mkdirs()) {
throw new IllegalArgumentException("Could not create directory '" + socketDir + "'");
}
Path socketFile;
do {
socketFile = dir.resolve(randomSocketFileName());
} while (Files.exists(socketFile));

String absolutePath = socketFile.toAbsolutePath().toString();
log.debug("Opening profiler correlation socket {}", absolutePath);
UniversalProfilingCorrelation.startProfilerReturnChannel(absolutePath);
return absolutePath;
}

private String randomSocketFileName() {
StringBuilder name = new StringBuilder("essock");
String allowedChars = "abcdefghijklmonpqrstuvwxzyABCDEFGHIJKLMONPQRSTUVWXYZ0123456789";
Random rnd = new Random();
for (int i = 0; i < 8; i++) {
int idx = rnd.nextInt(allowedChars.length());
name.append(allowedChars.charAt(idx));
}
return name.toString();
}

private void consumeProfilerMessages() {
try {
while (true) {
try {
ProfilerMessage message =
UniversalProfilingCorrelation.readProfilerReturnChannelMessage();
if (message == null) {
break;
} else if (message instanceof TraceCorrelationMessage) {
handleMessage((TraceCorrelationMessage) message);
} else if (message instanceof ProfilerRegistrationMessage) {
handleMessage((ProfilerRegistrationMessage) message);
} else {
log.debug("Received unknown message type from profiler: {}", message);
}
} catch (DecodeException e) {
log.warn("Failed to read profiler message", e);
// intentionally no break here, subsequent messages might be decodeable
}
}
} catch (Exception e) {
log.error("Cannot read from profiler socket", e);
}
}

private void handleMessage(ProfilerRegistrationMessage message) {
//TODO: handle message
}

private void handleMessage(TraceCorrelationMessage message) {
//TODO: handle message
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.stagemonitor.configuration.ConfigurationRegistry;
Expand All @@ -46,6 +47,9 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.function.Consumer;

import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat;
Expand All @@ -62,6 +66,16 @@ public class UniversalProfilingIntegrationTest {

private TestObjectPoolFactory poolFactory;

@TempDir
Path tempDir;

void setupTracer() {
setupTracer(config -> {
UniversalProfilingConfiguration conf = config.getConfig(UniversalProfilingConfiguration.class);
doReturn(true).when(conf).isEnabled();
doReturn(tempDir.toAbsolutePath().toString()).when(conf).getSocketDir();
});
}
void setupTracer(Consumer<ConfigurationRegistry> configCustomizer) {
ConfigurationRegistry config = SpyConfiguration.createSpyConfig();
configCustomizer.accept(config);
Expand Down Expand Up @@ -96,6 +110,7 @@ public void ensureDisabledOnWindows() {

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Test
Expand All @@ -107,6 +122,7 @@ public void ensureDisabledByDefault() {

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Nested
Expand All @@ -115,8 +131,7 @@ class SharedMemory {

@Test
public void testNestedActivations() {
setupTracer(conf -> doReturn(true)
.when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled());
setupTracer();

Transaction first = tracer.startRootTransaction(null);
Transaction second = tracer.startRootTransaction(null);
Expand Down Expand Up @@ -168,7 +183,9 @@ public void testNestedActivations() {
@ValueSource(strings = {"my nameßspace", ""})
public void testProcessStoragePopulated(String environment) {
setupTracer(conf -> {
doReturn(true).when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled();
UniversalProfilingConfiguration profConfig = conf.getConfig(UniversalProfilingConfiguration.class);
doReturn(true).when(profConfig).isEnabled();
doReturn(tempDir.toAbsolutePath().toString()).when(profConfig).getSocketDir();
CoreConfiguration core = conf.getConfig(CoreConfiguration.class);
doReturn("service Ä 1").when(core).getServiceName();
doReturn(environment).when(core).getEnvironment();
Expand All @@ -179,7 +196,8 @@ public void testProcessStoragePopulated(String environment) {
assertThat(buffer.getChar()).describedAs("layout-minor-version").isEqualTo((char) 1);
assertThat(readUtf8Str(buffer)).isEqualTo("service Ä 1");
assertThat(readUtf8Str(buffer)).isEqualTo(environment);
assertThat(readUtf8Str(buffer)).describedAs("socket-path").isEqualTo("");
assertThat(readUtf8Str(buffer)).describedAs("socket-path")
.startsWith(tempDir.toAbsolutePath().toString() + "/essock");
}

private String readUtf8Str(ByteBuffer buffer) {
Expand Down Expand Up @@ -220,6 +238,58 @@ static void checkTlsIs(@Nullable AbstractSpan<?> span) {
}
}

@DisabledOnOs(OS.WINDOWS)
@Nested
class SpanCorrelation {

@Test
void badSocketPath() throws Exception {
Path notADir = tempDir.resolve("not_a_dir");
Files.createFile(notADir);
String absPath = notADir.toAbsolutePath().toString();

ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig();
UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class);

doReturn(true).when(config).isEnabled();
doReturn(absPath).when(config).getSocketDir();

UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration();
ElasticApmTracer mockTracer = MockTracer.create(configRegistry);

universalProfilingIntegration.start(mockTracer);

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Test
void socketParentDirCreated() throws Exception {
Path subDirs = tempDir.resolve("create/me");
String absolute = subDirs.toAbsolutePath().toString();

ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig();
UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class);

doReturn(true).when(config).isEnabled();
doReturn(absolute).when(config).getSocketDir();

UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration();
ElasticApmTracer mockTracer = MockTracer.create(configRegistry);

universalProfilingIntegration.start(mockTracer);
try {
assertThat(Paths.get(universalProfilingIntegration.socketPath)).exists();
assertThat(universalProfilingIntegration.socketPath).startsWith(absolute + "/");
} finally {
universalProfilingIntegration.stop();
}
}

}


private static byte[] idToBytes(Id id) {
byte[] buff = new byte[32];
int len = id.toBytes(buff, 0);
Expand Down