Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.Optional;

public class StreamsMetricsImpl implements StreamsMetrics {

Expand Down Expand Up @@ -210,11 +211,12 @@ public final Sensor threadLevelSensor(final String threadId,
final Sensor... parents) {
final String key = threadSensorPrefix(threadId);
synchronized (threadLevelSensors) {
threadLevelSensors.putIfAbsent(key, new LinkedList<>());
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
threadLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
threadLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

Expand Down Expand Up @@ -290,13 +292,12 @@ public final Sensor taskLevelSensor(final String threadId,
final Sensor... parents) {
final String key = taskSensorPrefix(threadId, taskId);
synchronized (taskLevelSensors) {
if (!taskLevelSensors.containsKey(key)) {
taskLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
taskLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
taskLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

Expand All @@ -323,17 +324,12 @@ public Sensor nodeLevelSensor(final String threadId,
final Sensor... parents) {
final String key = nodeSensorPrefix(threadId, taskId, processorNodeName);
synchronized (nodeLevelSensors) {
if (!nodeLevelSensors.containsKey(key)) {
nodeLevelSensors.put(key, new LinkedList<>());
}

final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;

final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);

nodeLevelSensors.get(key).push(fullSensorName);

return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
nodeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

Expand Down Expand Up @@ -362,17 +358,12 @@ public Sensor cacheLevelSensor(final String threadId,
final Sensor... parents) {
final String key = cacheSensorPrefix(threadId, taskName, storeName);
synchronized (cacheLevelSensors) {
if (!cacheLevelSensors.containsKey(key)) {
cacheLevelSensors.put(key, new LinkedList<>());
}

final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;

final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);

cacheLevelSensors.get(key).push(fullSensorName);

return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
cacheLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

Expand Down Expand Up @@ -413,15 +404,12 @@ public final Sensor storeLevelSensor(final String threadId,
final Sensor... parents) {
final String key = storeSensorPrefix(threadId, taskId, storeName);
synchronized (storeLevelSensors) {
if (!storeLevelSensors.containsKey(key)) {
storeLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);

storeLevelSensors.get(key).push(fullSensorName);

return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,235 @@ private void addSensorsOnAllLevels(final Metrics metrics, final StreamsMetricsIm
streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, storeName, sensorName2, RecordingLevel.INFO);
}

private void setupGetSensorTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
final String fullSensorName =
INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1;
private void setupGetNewSensorTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(null);
final Sensor[] parents = {};
expect(metrics.sensor(fullSensorName, recordingLevel, parents)).andReturn(sensor);
replay(metrics);
}

private void setupGetExistingSensorTest(final Metrics metrics,
final String level) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(sensor);
replay(metrics);
}

private String fullSensorName(final String level) {
return INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1;
}

@Test
public void shouldGetNewThreadLevelSensor() {
Comment thread
highluck marked this conversation as resolved.
Outdated
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is(equalToObject(sensor))) and not just is(sensor)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is on me. In this verification it is important to check for reference equality, because you want that threadLevelSensor() returns the sensor that is registered in the Metrics object or that was created by Metrics#sensor().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I get that we want to make sure the same instance is returned. But since Sensor doesn't override equals, is(sensor) should still do an instance equality check. It's really a minor point, so I don't care too much if we keep it as is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used equalToObject() because it makes the intent more explicit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, let's just leave it as is then. Thanks for the explanation.

}

@Test
public void shouldGetExistingThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}

@Test
public void shouldGetNewTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto and same for tests below, I won't repeat this comment.

}

@Test
public void shouldGetExistingTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID + ".task." + TASK_ID);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetNewStoreLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(
metrics,
THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetExistingStoreLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetNewNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + "node"
+ SENSOR_PREFIX_DELIMITER + processorNodeName,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
processorNodeName,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetExistingNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "node" + SENSOR_PREFIX_DELIMITER + processorNodeName
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
processorNodeName,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetNewCacheLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID,
TASK_ID,
processorCacheName,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldGetExistingCacheLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID, TASK_ID,
processorCacheName,
sensorName1,
recordingLevel
);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

@Test
public void shouldAddClientLevelImmutableMetric() {
final Metrics metrics = mock(Metrics.class);
Expand Down Expand Up @@ -244,19 +463,6 @@ public void shouldProvideCorrectStrings() {
assertThat(ROLLUP_VALUE, is("all"));
}

@Test
public void shouldGetThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetSensorTest(metrics, THREAD_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);

final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID, sensorName1, recordingLevel);

verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}

private void setupRemoveSensorsTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
Expand Down