Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions vpro-shared-monitoring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.6.6</version>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>2.10.6</version>
<version>2.10.9.2</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package nl.vpro.monitoring.config;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.cache.EhCache2Metrics;
import io.micrometer.core.instrument.binder.db.PostgreSQLDatabaseMetrics;
Expand All @@ -26,6 +27,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import nl.vpro.util.locker.ObjectLocker;
import nl.vpro.util.locker.ObjectLockerAdmin;

import static nl.vpro.util.locker.ObjectLocker.Listener.Type.LOCK;

@Configuration
public class MonitoringConfig {

Expand Down Expand Up @@ -92,6 +98,40 @@ public PrometheusMeterRegistry globalMeterRegistry() {
new DiskSpaceMetrics(new File(folder)).bindTo(registry);
}
}
if (properties.isMeterLocks()) {
ObjectLocker.listen((type, holder, duration) -> {
if (holder.lock.getHoldCount() == 1 && type == LOCK) {
Object key = holder.key;
String keyType = key instanceof ObjectLocker.DefinesType ? String.valueOf(((ObjectLocker.DefinesType) key).getType()) : key.getClass().getSimpleName();
registry.counter("locks.event", "type", keyType).increment();
}
});
Gauge.builder("locks.count", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getCurrentCount)
.description("The current number of locked objects")
.register(registry);

Gauge.builder("locks.total_count", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getLockCount)
.description("The total number of locked objects untill now")
.register(registry);

Gauge.builder("locks.average_acquiretime", () -> ObjectLockerAdmin.JMX_INSTANCE.getAverageLockAcquireTime().getWindowValue().getValue())
.description("The average time in ms to acquire a lock")
.register(registry);

Gauge.builder("locks.max_concurrency", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getMaxConcurrency)
.description("The maximum number threads waiting for the same object")
.register(registry);

Gauge.builder("locks.current_max_concurrency", () -> ObjectLocker.getLockedObjects().values().stream().mapToInt(l -> l.lock.getHoldCount()).max().orElse(0))
.description("The maximum number threads waiting for the same object")
.register(registry);

Gauge.builder("locks.maxDepth", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getMaxConcurrency)
.description("The maximum number of locked objects in the same thread")
.register(registry);

}

return registry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ public class MonitoringProperties {

@Value("${monitoring.volumes:#{null}}")
private List<String> meterVolumes;

@Value("${monitoring.locks:true}")
private boolean meterLocks;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
Expand All @@ -19,8 +18,6 @@

import nl.vpro.logging.Slf4jHelper;

import static nl.vpro.util.locker.ObjectLockerAdmin.JMX_INSTANCE;

/**
* @author Michiel Meeuwissen
*/
Expand All @@ -42,11 +39,50 @@ private ObjectLocker() {
static Duration maxLockAcquireTime = Duration.ofMinutes(10);
static Duration minWaitTime = Duration.ofSeconds(5);


private static final List<Listener> LISTENERS = new CopyOnWriteArrayList<>();

public static void listen(Listener listener){
LISTENERS.add(listener);
}

public static void unlisten(Listener listener){
LISTENERS.remove(listener);
}


@FunctionalInterface
public interface Listener extends EventListener {
enum Type {
LOCK,
UNLOCK
}

void event(Type type, LockHolder<?> holder, Duration duration);

default void lock(LockHolder<?> lock, Duration duration) {
event(Type.LOCK, lock, duration);
}

default void unlock(LockHolder<?> lock, Duration duration) {
event(Type.UNLOCK, lock, duration);
}
}

public interface DefinesType {
Object getType();
}

/**
* Map key -> ReentrantLock
*/
static final Map<Serializable, LockHolder<Serializable>> LOCKED_OBJECTS = new ConcurrentHashMap<>();


public static Map<Serializable, LockHolder<Serializable>> getLockedObjects() {
return Collections.unmodifiableMap(LOCKED_OBJECTS);
}

public static <T> T withKeyLock(
Serializable id,
@NonNull String reason,
Expand Down Expand Up @@ -109,7 +145,6 @@ private static <K extends Serializable> LockHolder<K> acquireLock(
holder = locks.computeIfAbsent(key, (m) -> computeLock(m, reason, comparable));
if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), key);
JMX_INSTANCE.maxConcurrency = Math.max(holder.lock.getQueueLength(), JMX_INSTANCE.maxConcurrency);
alreadyWaiting = true;
}
}
Expand All @@ -123,13 +158,15 @@ private static <K extends Serializable> LockHolder<K> acquireLock(
log.debug("Released and continuing {}", key);
}

JMX_INSTANCE.maxDepth = Math.max(JMX_INSTANCE.maxDepth, holder.lock.getHoldCount());
log.trace("{} holdcount {}", Thread.currentThread().hashCode(), holder.lock.getHoldCount());
Duration acquireTime = Duration.ofNanos(System.nanoTime() - nanoStart);
if (holder.lock.getHoldCount() == 1) {
JMX_INSTANCE.lockCount.computeIfAbsent(reason, s -> new AtomicInteger(0)).incrementAndGet();
JMX_INSTANCE.currentCount.computeIfAbsent(reason, s -> new AtomicInteger()).incrementAndGet();
Duration aquireTime = Duration.ofNanos(System.nanoTime() - nanoStart);
log.debug("Acquired lock for {} ({}) in {}", key, reason, aquireTime);
log.debug("Acquired lock for {} ({}) in {}", key, reason, acquireTime);
}


for(Listener listener : LISTENERS) {
listener.lock(holder, acquireTime);
}
return holder;
}
Expand Down Expand Up @@ -183,8 +220,11 @@ private static <K extends Serializable> void releaseLock(long nanoStart, K key,
log.trace("Removed {}", key);
locks.remove(key);
}
JMX_INSTANCE.currentCount.computeIfAbsent(reason, s -> new AtomicInteger()).decrementAndGet();
Duration duration = Duration.ofNanos(System.nanoTime() - nanoStart);
for (Listener listener : LISTENERS) {
listener.unlock(lock, duration);
}

Slf4jHelper.log(log, duration.compareTo(Duration.ofSeconds(30))> 0 ? Level.WARN : Level.DEBUG,
"Released lock for {} ({}) in {}", key, reason, Duration.ofNanos(System.nanoTime() - nanoStart));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package nl.vpro.util.locker;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

import org.meeuw.math.statistics.StatisticalLong;
import org.meeuw.math.windowed.WindowedEventRate;
import org.meeuw.math.windowed.WindowedStatisticalLong;

import nl.vpro.jmx.MBeans;
import nl.vpro.util.TimeUtils;

/**
* @author Michiel Meeuwissen
* @since 5.8
*/
@Slf4j
public class ObjectLockerAdmin implements ObjectLockerAdminMXBean {


Expand All @@ -32,25 +37,66 @@ public class ObjectLockerAdmin implements ObjectLockerAdminMXBean {
}
}

private ObjectLockerAdmin() {

@Getter
final WindowedStatisticalLong averageLockAcquireTime = WindowedStatisticalLong.builder()
.mode(StatisticalLong.Mode.DURATION)
.window(Duration.ofMinutes(60))
.bucketCount(60)
.build();

@Getter
final WindowedEventRate lockRate = WindowedEventRate.builder()
.window(Duration.ofMinutes(10))
.bucketCount(60)
.build();


private ObjectLockerAdmin() {
ObjectLocker.listen((type, holder, duration) -> {
switch(type) {
case LOCK:
maxDepth = Math.max(maxDepth, holder.lock.getHoldCount());

if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), holder.key);
maxConcurrency = Math.max(holder.lock.getQueueLength(), maxConcurrency);
}
if (holder.lock.getHoldCount() == 1) {
lockCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).incrementAndGet();
currentCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).incrementAndGet();
lockRate.newEvent();
averageLockAcquireTime.accept(duration.toMillis());
}
break;
case UNLOCK:
currentCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).decrementAndGet();

}
});
}

/**
* Number of locks per 'reason'.
* Total number of locks per 'reason'. Never decreases
*/
Map<String, AtomicInteger> lockCount = new HashMap<>();
private final Map<String, AtomicInteger> lockCount = new HashMap<>();

/**
* Count per 'reason'.
* Current count per 'reason'.
*/
Map<String, AtomicInteger> currentCount = new HashMap<>();

@Getter
int maxConcurrency = 0;
private int maxConcurrency = 0;

@Getter
int maxDepth = 0;
private int maxDepth = 0;

@Override
public void resetMaxValues() {
maxConcurrency = 0;
maxDepth = 0;
}

@Override
public Set<String> getLocks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ public interface ObjectLockerAdminMXBean {
@Description("The current number of locks. Should be a low number, most of the time zero.")
int getCurrentCount();

@Description("The total number of acquired locks per reason. So this grows continuously.")
@Description("The total number of acquired locks per reason.")
Map<String, Integer> getCurrentCounts();

@Description("The maximum concurrency level reached since the start of the application. I.e. the number of threads trying to acces the same lock simultaneously")
int getMaxConcurrency();



@Description("The maximum depth reach. I.e. the maximum number of 'nested' code locking the same mid.")
int getMaxDepth();

String getMaxLockAcquireTime();

void setMaxLockAcquireTime(String duration);

void resetMaxValues();

boolean isMonitor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.*;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import static nl.vpro.util.locker.ObjectLocker.withKeyLock;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -20,6 +22,7 @@
* @author Michiel Meeuwissen
*/
@Slf4j
@Execution(ExecutionMode.SAME_THREAD)
public class ObjectLockerTest {


Expand Down Expand Up @@ -100,6 +103,7 @@ public void withNullLock() throws InterruptedException, ExecutionException {
*/
@EqualsAndHashCode
static class Key implements Serializable {
private static final long serialVersionUID = -1689250631089355976L;
private final String v;

Key(String value) {
Expand Down Expand Up @@ -171,17 +175,30 @@ public void twoSameLocks() {
}

@Test
public void twoDifferentLocks() {
public void twoDifferentLocksAndTestLockerAdmin() {
ObjectLocker.stricltyOne = false;

ObjectLockerAdmin.JMX_INSTANCE.resetMaxValues();
int before = ObjectLockerAdmin.JMX_INSTANCE.getLockCount();
final List<String> listenedEvents = new CopyOnWriteArrayList<>();
ObjectLocker.Listener listener = (type, holder, duration) -> listenedEvents.add(type + ":" + holder.key);
final List<String> events = new CopyOnWriteArrayList<>();
ObjectLocker.listen(listener);
withKeyLock(new Key("keya"), "test2", () -> {
events.add("a1");
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(1);
withKeyLock(new Key("keyb"), "nested", () -> {
events.add("b2");
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(2);

});
});
assertThat(events).containsExactly("a1", "b2");
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(0);
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getLockCount()).isEqualTo(before + 2);
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getMaxDepth()).isEqualTo(1);
assertThat(ObjectLockerAdmin.JMX_INSTANCE.getMaxConcurrency()).isEqualTo(0);

assertThat(listenedEvents).containsExactly("LOCK:keya", "LOCK:keyb", "UNLOCK:keyb", "UNLOCK:keya");


}
Expand Down