diff --git a/vpro-shared-monitoring/pom.xml b/vpro-shared-monitoring/pom.xml
index 574324738..a5f21a2c6 100644
--- a/vpro-shared-monitoring/pom.xml
+++ b/vpro-shared-monitoring/pom.xml
@@ -16,12 +16,12 @@
io.micrometer
micrometer-registry-prometheus
- 1.6.6
+ 1.7.0
net.sf.ehcache
ehcache
- 2.10.6
+ 2.10.9.2
provided
diff --git a/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringConfig.java b/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringConfig.java
index 8f8eee982..69d7cd1ca 100644
--- a/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringConfig.java
+++ b/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringConfig.java
@@ -1,5 +1,6 @@
package nl.vpro.monitoring.config;
+import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.cache.EhCache2Metrics;
import io.micrometer.core.instrument.binder.db.PostgreSQLDatabaseMetrics;
@@ -26,6 +27,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import nl.vpro.util.locker.ObjectLocker;
+import nl.vpro.util.locker.ObjectLockerAdmin;
+
+import static nl.vpro.util.locker.ObjectLocker.Listener.Type.LOCK;
+
@Configuration
public class MonitoringConfig {
@@ -92,6 +98,40 @@ public PrometheusMeterRegistry globalMeterRegistry() {
new DiskSpaceMetrics(new File(folder)).bindTo(registry);
}
}
+ if (properties.isMeterLocks()) {
+ ObjectLocker.listen((type, holder, duration) -> {
+ if (holder.lock.getHoldCount() == 1 && type == LOCK) {
+ Object key = holder.key;
+ String keyType = key instanceof ObjectLocker.DefinesType ? String.valueOf(((ObjectLocker.DefinesType) key).getType()) : key.getClass().getSimpleName();
+ registry.counter("locks.event", "type", keyType).increment();
+ }
+ });
+ Gauge.builder("locks.count", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getCurrentCount)
+ .description("The current number of locked objects")
+ .register(registry);
+
+ Gauge.builder("locks.total_count", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getLockCount)
+ .description("The total number of locked objects untill now")
+ .register(registry);
+
+ Gauge.builder("locks.average_acquiretime", () -> ObjectLockerAdmin.JMX_INSTANCE.getAverageLockAcquireTime().getWindowValue().getValue())
+ .description("The average time in ms to acquire a lock")
+ .register(registry);
+
+ Gauge.builder("locks.max_concurrency", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getMaxConcurrency)
+ .description("The maximum number threads waiting for the same object")
+ .register(registry);
+
+ Gauge.builder("locks.current_max_concurrency", () -> ObjectLocker.getLockedObjects().values().stream().mapToInt(l -> l.lock.getHoldCount()).max().orElse(0))
+ .description("The maximum number threads waiting for the same object")
+ .register(registry);
+
+ Gauge.builder("locks.maxDepth", ObjectLockerAdmin.JMX_INSTANCE, ObjectLockerAdmin::getMaxConcurrency)
+ .description("The maximum number of locked objects in the same thread")
+ .register(registry);
+
+ }
+
return registry;
}
diff --git a/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringProperties.java b/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringProperties.java
index 126a7ff3e..4afe25522 100644
--- a/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringProperties.java
+++ b/vpro-shared-monitoring/src/main/java/nl/vpro/monitoring/config/MonitoringProperties.java
@@ -66,4 +66,7 @@ public class MonitoringProperties {
@Value("${monitoring.volumes:#{null}}")
private List meterVolumes;
+
+ @Value("${monitoring.locks:true}")
+ private boolean meterLocks;
}
diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java
index b2488eb0a..398159917 100644
--- a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java
+++ b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLocker.java
@@ -8,7 +8,6 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
@@ -19,8 +18,6 @@
import nl.vpro.logging.Slf4jHelper;
-import static nl.vpro.util.locker.ObjectLockerAdmin.JMX_INSTANCE;
-
/**
* @author Michiel Meeuwissen
*/
@@ -42,11 +39,50 @@ private ObjectLocker() {
static Duration maxLockAcquireTime = Duration.ofMinutes(10);
static Duration minWaitTime = Duration.ofSeconds(5);
+
+ private static final List LISTENERS = new CopyOnWriteArrayList<>();
+
+ public static void listen(Listener listener){
+ LISTENERS.add(listener);
+ }
+
+ public static void unlisten(Listener listener){
+ LISTENERS.remove(listener);
+ }
+
+
+ @FunctionalInterface
+ public interface Listener extends EventListener {
+ enum Type {
+ LOCK,
+ UNLOCK
+ }
+
+ void event(Type type, LockHolder> holder, Duration duration);
+
+ default void lock(LockHolder> lock, Duration duration) {
+ event(Type.LOCK, lock, duration);
+ }
+
+ default void unlock(LockHolder> lock, Duration duration) {
+ event(Type.UNLOCK, lock, duration);
+ }
+ }
+
+ public interface DefinesType {
+ Object getType();
+ }
+
/**
* Map key -> ReentrantLock
*/
static final Map> LOCKED_OBJECTS = new ConcurrentHashMap<>();
+
+ public static Map> getLockedObjects() {
+ return Collections.unmodifiableMap(LOCKED_OBJECTS);
+ }
+
public static T withKeyLock(
Serializable id,
@NonNull String reason,
@@ -109,7 +145,6 @@ private static LockHolder acquireLock(
holder = locks.computeIfAbsent(key, (m) -> computeLock(m, reason, comparable));
if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), key);
- JMX_INSTANCE.maxConcurrency = Math.max(holder.lock.getQueueLength(), JMX_INSTANCE.maxConcurrency);
alreadyWaiting = true;
}
}
@@ -123,13 +158,15 @@ private static LockHolder acquireLock(
log.debug("Released and continuing {}", key);
}
- JMX_INSTANCE.maxDepth = Math.max(JMX_INSTANCE.maxDepth, holder.lock.getHoldCount());
log.trace("{} holdcount {}", Thread.currentThread().hashCode(), holder.lock.getHoldCount());
+ Duration acquireTime = Duration.ofNanos(System.nanoTime() - nanoStart);
if (holder.lock.getHoldCount() == 1) {
- JMX_INSTANCE.lockCount.computeIfAbsent(reason, s -> new AtomicInteger(0)).incrementAndGet();
- JMX_INSTANCE.currentCount.computeIfAbsent(reason, s -> new AtomicInteger()).incrementAndGet();
- Duration aquireTime = Duration.ofNanos(System.nanoTime() - nanoStart);
- log.debug("Acquired lock for {} ({}) in {}", key, reason, aquireTime);
+ log.debug("Acquired lock for {} ({}) in {}", key, reason, acquireTime);
+ }
+
+
+ for(Listener listener : LISTENERS) {
+ listener.lock(holder, acquireTime);
}
return holder;
}
@@ -183,8 +220,11 @@ private static void releaseLock(long nanoStart, K key,
log.trace("Removed {}", key);
locks.remove(key);
}
- JMX_INSTANCE.currentCount.computeIfAbsent(reason, s -> new AtomicInteger()).decrementAndGet();
Duration duration = Duration.ofNanos(System.nanoTime() - nanoStart);
+ for (Listener listener : LISTENERS) {
+ listener.unlock(lock, duration);
+ }
+
Slf4jHelper.log(log, duration.compareTo(Duration.ofSeconds(30))> 0 ? Level.WARN : Level.DEBUG,
"Released lock for {} ({}) in {}", key, reason, Duration.ofNanos(System.nanoTime() - nanoStart));
}
diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdmin.java b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdmin.java
index 1c0e1af2b..adca61c76 100644
--- a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdmin.java
+++ b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdmin.java
@@ -1,16 +1,20 @@
package nl.vpro.util.locker;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import java.time.Duration;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.meeuw.math.statistics.StatisticalLong;
+import org.meeuw.math.windowed.WindowedEventRate;
+import org.meeuw.math.windowed.WindowedStatisticalLong;
+
import nl.vpro.jmx.MBeans;
import nl.vpro.util.TimeUtils;
@@ -18,6 +22,7 @@
* @author Michiel Meeuwissen
* @since 5.8
*/
+@Slf4j
public class ObjectLockerAdmin implements ObjectLockerAdminMXBean {
@@ -32,25 +37,66 @@ public class ObjectLockerAdmin implements ObjectLockerAdminMXBean {
}
}
- private ObjectLockerAdmin() {
+ @Getter
+ final WindowedStatisticalLong averageLockAcquireTime = WindowedStatisticalLong.builder()
+ .mode(StatisticalLong.Mode.DURATION)
+ .window(Duration.ofMinutes(60))
+ .bucketCount(60)
+ .build();
+
+ @Getter
+ final WindowedEventRate lockRate = WindowedEventRate.builder()
+ .window(Duration.ofMinutes(10))
+ .bucketCount(60)
+ .build();
+
+
+ private ObjectLockerAdmin() {
+ ObjectLocker.listen((type, holder, duration) -> {
+ switch(type) {
+ case LOCK:
+ maxDepth = Math.max(maxDepth, holder.lock.getHoldCount());
+
+ if (holder.lock.isLocked() && !holder.lock.isHeldByCurrentThread()) {
+ log.debug("There are already threads ({}) for {}, waiting", holder.lock.getQueueLength(), holder.key);
+ maxConcurrency = Math.max(holder.lock.getQueueLength(), maxConcurrency);
+ }
+ if (holder.lock.getHoldCount() == 1) {
+ lockCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).incrementAndGet();
+ currentCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).incrementAndGet();
+ lockRate.newEvent();
+ averageLockAcquireTime.accept(duration.toMillis());
+ }
+ break;
+ case UNLOCK:
+ currentCount.computeIfAbsent(holder.reason, s -> new AtomicInteger()).decrementAndGet();
+
+ }
+ });
}
/**
- * Number of locks per 'reason'.
+ * Total number of locks per 'reason'. Never decreases
*/
- Map lockCount = new HashMap<>();
+ private final Map lockCount = new HashMap<>();
/**
- * Count per 'reason'.
+ * Current count per 'reason'.
*/
Map currentCount = new HashMap<>();
@Getter
- int maxConcurrency = 0;
+ private int maxConcurrency = 0;
@Getter
- int maxDepth = 0;
+ private int maxDepth = 0;
+
+ @Override
+ public void resetMaxValues() {
+ maxConcurrency = 0;
+ maxDepth = 0;
+ }
@Override
public Set getLocks() {
diff --git a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdminMXBean.java b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdminMXBean.java
index 0d6eeae05..76db0e379 100644
--- a/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdminMXBean.java
+++ b/vpro-shared-util/src/main/java/nl/vpro/util/locker/ObjectLockerAdminMXBean.java
@@ -23,13 +23,14 @@ public interface ObjectLockerAdminMXBean {
@Description("The current number of locks. Should be a low number, most of the time zero.")
int getCurrentCount();
- @Description("The total number of acquired locks per reason. So this grows continuously.")
+ @Description("The total number of acquired locks per reason.")
Map getCurrentCounts();
@Description("The maximum concurrency level reached since the start of the application. I.e. the number of threads trying to acces the same lock simultaneously")
int getMaxConcurrency();
+
@Description("The maximum depth reach. I.e. the maximum number of 'nested' code locking the same mid.")
int getMaxDepth();
@@ -37,6 +38,7 @@ public interface ObjectLockerAdminMXBean {
void setMaxLockAcquireTime(String duration);
+ void resetMaxValues();
boolean isMonitor();
diff --git a/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java b/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java
index 902d50c21..e466e119e 100644
--- a/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java
+++ b/vpro-shared-util/src/test/java/nl/vpro/util/locker/ObjectLockerTest.java
@@ -10,6 +10,8 @@
import java.util.concurrent.*;
import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
import static nl.vpro.util.locker.ObjectLocker.withKeyLock;
import static org.assertj.core.api.Assertions.assertThat;
@@ -20,6 +22,7 @@
* @author Michiel Meeuwissen
*/
@Slf4j
+@Execution(ExecutionMode.SAME_THREAD)
public class ObjectLockerTest {
@@ -100,6 +103,7 @@ public void withNullLock() throws InterruptedException, ExecutionException {
*/
@EqualsAndHashCode
static class Key implements Serializable {
+ private static final long serialVersionUID = -1689250631089355976L;
private final String v;
Key(String value) {
@@ -171,17 +175,30 @@ public void twoSameLocks() {
}
@Test
- public void twoDifferentLocks() {
+ public void twoDifferentLocksAndTestLockerAdmin() {
ObjectLocker.stricltyOne = false;
-
+ ObjectLockerAdmin.JMX_INSTANCE.resetMaxValues();
+ int before = ObjectLockerAdmin.JMX_INSTANCE.getLockCount();
+ final List listenedEvents = new CopyOnWriteArrayList<>();
+ ObjectLocker.Listener listener = (type, holder, duration) -> listenedEvents.add(type + ":" + holder.key);
final List events = new CopyOnWriteArrayList<>();
+ ObjectLocker.listen(listener);
withKeyLock(new Key("keya"), "test2", () -> {
events.add("a1");
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(1);
withKeyLock(new Key("keyb"), "nested", () -> {
events.add("b2");
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(2);
+
});
});
assertThat(events).containsExactly("a1", "b2");
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getCurrentCount()).isEqualTo(0);
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getLockCount()).isEqualTo(before + 2);
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getMaxDepth()).isEqualTo(1);
+ assertThat(ObjectLockerAdmin.JMX_INSTANCE.getMaxConcurrency()).isEqualTo(0);
+
+ assertThat(listenedEvents).containsExactly("LOCK:keya", "LOCK:keyb", "UNLOCK:keyb", "UNLOCK:keya");
}