diff --git a/api/src/main/java/org/apache/brooklyn/api/objs/HighlightTuple.java b/api/src/main/java/org/apache/brooklyn/api/objs/HighlightTuple.java
index d31b31733e..97e66337fc 100644
--- a/api/src/main/java/org/apache/brooklyn/api/objs/HighlightTuple.java
+++ b/api/src/main/java/org/apache/brooklyn/api/objs/HighlightTuple.java
@@ -18,18 +18,24 @@
*/
package org.apache.brooklyn.api.objs;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.Beta;
+
+/** A record of a highlight e.g. attached to an {@link EntityAdjunct}.
+ *
+ * Time of 0 or negative indicates the highlight is ongoing. Task ID is optional. */
+@Beta
public class HighlightTuple {
private String description;
private long time;
private String taskId;
- //required for JSON de-serialisation
- private HighlightTuple(){
-
- }
+ @SuppressWarnings("unused") //required for JSON de-serialisation
+ private HighlightTuple(){}
- public HighlightTuple(String description, long time, String taskId) {
+ public HighlightTuple(String description, long time, @Nullable String taskId) {
this.description = description;
this.time = time;
this.taskId = taskId;
@@ -51,7 +57,7 @@ public void setTime(long time) {
this.time = time;
}
- public String getTaskId() {
+ @Nullable public String getTaskId() {
return taskId;
}
@@ -78,4 +84,9 @@ public int hashCode() {
result = 31 * result + (taskId != null ? taskId.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return "highlight["+description+"; time="+System.currentTimeMillis()+(taskId==null ? "" : "; task="+taskId)+"]";
+ }
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
index 39b23ba11f..08a39be895 100644
--- a/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
+++ b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
@@ -51,7 +51,7 @@ public abstract class AbstractEnricher extends AbstractEntityAdjunct implements
"Whether duplicate values published by this enricher should be suppressed");
private static class DeduplicatingAttributeModifier implements Function> {
- public static Function> create(T newVal) {
+ public static DeduplicatingAttributeModifier create(T newVal) {
return new DeduplicatingAttributeModifier(newVal);
}
@@ -60,15 +60,20 @@ private DeduplicatingAttributeModifier(T newVal) {
}
private T newValue;
+ private Maybe lastValue;
@Override
public Maybe apply(T oldValue) {
if (Objects.equal(oldValue, newValue)) {
- return Maybe.absent("Skipping update, values equal");
+ return lastValue = Maybe.absent("Skipping update, values equal");
} else {
- return Maybe.of(newValue);
+ return lastValue = Maybe.of(newValue);
}
}
+
+ public Maybe getLastValue() {
+ return lastValue;
+ }
}
private final EnricherDynamicType enricherType;
@@ -129,16 +134,22 @@ protected void emit(Sensor sensor, Object val) {
}
T newVal = TypeCoercions.coerce(val, sensor.getTypeToken());
+ Maybe published = Maybe.of(newVal);
if (sensor instanceof AttributeSensor) {
AttributeSensor attribute = (AttributeSensor)sensor;
if (Boolean.TRUE.equals(suppressDuplicates)) {
- entity.sensors().modify(attribute, DeduplicatingAttributeModifier.create(newVal));
+ DeduplicatingAttributeModifier modifier = DeduplicatingAttributeModifier.create(newVal);
+ entity.sensors().modify(attribute, modifier);
+ published = modifier.getLastValue();
} else {
entity.sensors().set(attribute, newVal);
}
} else {
entity.sensors().emit(sensor, newVal);
}
+ if (published!=null && published.isPresent()) {
+ highlightActionPublishSensor(sensor, published.get());
+ }
}
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
index 0ce9169dcc..2b7113e650 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
@@ -285,6 +285,7 @@ public void init() {
if (uniqueTag==null) uniqueTag = DEFAULT_ENRICHER_UNIQUE_TAG;
}
+ @SuppressWarnings("unchecked")
@Override
public void setEntity(EntityLocal entity) {
super.setEntity(entity);
@@ -297,6 +298,8 @@ public void setEntity(EntityLocal entity) {
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_PROBLEMS, this);
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_UP, this);
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_STATE_EXPECTED, this);
+
+ highlightTriggers(MutableList.of(SERVICE_PROBLEMS, SERVICE_UP, SERVICE_STATE_EXPECTED), null);
}
@Override
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
index fba15caa52..84165eae17 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
@@ -24,6 +24,7 @@
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.ConfigKeys;
@@ -32,6 +33,7 @@
import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +103,7 @@ public void start() {
throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
}
- poller = new Poller(entity, getConfig(ONLY_IF_SERVICE_UP));
+ poller = new Poller(entity, this, getConfig(ONLY_IF_SERVICE_UP));
activated = true;
preStart();
synchronized (pollerStateMutex) {
@@ -225,4 +227,16 @@ protected void postStop() {
protected Poller> getPoller() {
return poller;
}
+
+ void highlightTriggerPeriod(Duration minPeriod) {
+ highlightTriggers("Running every "+minPeriod);
+ }
+
+ void onRemoveSensor(Sensor> sensor) {
+ highlightActionPublishSensor("Clear sensor "+sensor.getName());
+ }
+
+ void onPublishSensor(Sensor> sensor, Object v) {
+ highlightActionPublishSensor(sensor, v);
+ }
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
index ec69e51df8..fca844bf95 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -219,6 +219,7 @@ protected void setSensor(Object v) {
// nothing
} else if (v == FeedConfig.REMOVE) {
((EntityInternal)entity).removeAttribute(sensor);
+ feed.onRemoveSensor(sensor);
} else if (sensor == FeedConfig.NO_SENSOR) {
// nothing
} else {
@@ -227,6 +228,7 @@ protected void setSensor(Object v) {
// no change; nothing
} else {
entity.sensors().set(sensor, coercedV);
+ feed.onPublishSensor(sensor, coercedV);
}
}
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 1a29c48c82..5a78f37182 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -51,6 +51,7 @@ public class Poller {
public static final Logger log = LoggerFactory.getLogger(Poller.class);
private final Entity entity;
+ private final AbstractFeed feed;
private final boolean onlyIfServiceUp;
private final Set> oneOffJobs = new LinkedHashSet>();
private final Set> pollJobs = new LinkedHashSet>();
@@ -92,14 +93,15 @@ public void run() {
};
}
}
-
- /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */
+
+ /** @deprecated since 0.12.0 pass in feed */
@Deprecated
- public Poller(Entity entity) {
- this(entity, false);
- }
public Poller(Entity entity, boolean onlyIfServiceUp) {
+ this(entity, null, onlyIfServiceUp);
+ }
+ public Poller(Entity entity, AbstractFeed feed, boolean onlyIfServiceUp) {
this.entity = entity;
+ this.feed = feed;
this.onlyIfServiceUp = onlyIfServiceUp;
}
@@ -140,6 +142,7 @@ public void start() {
oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
}
+ Duration minPeriod = null;
for (final PollJob pollJob : pollJobs) {
final String scheduleName = pollJob.handler.getDescription();
if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
@@ -166,10 +169,17 @@ public Task> call() {
.period(pollJob.pollPeriod)
.cancelOnException(false);
tasks.add(Entities.submit(entity, task));
+ if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
+ minPeriod = pollJob.pollPeriod;
+ }
} else {
if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
}
}
+
+ if (minPeriod!=null && feed!=null) {
+ feed.highlightTriggerPeriod(minPeriod);
+ }
}
public void stop() {
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 2b8d9387d5..2c4c4b4662 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -28,12 +28,17 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.annotation.Nullable;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.Configurable;
import org.apache.brooklyn.api.objs.EntityAdjunct;
@@ -42,7 +47,6 @@
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
import org.apache.brooklyn.core.config.ConfigConstraints;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.internal.AbstractConfigMapImpl;
@@ -51,6 +55,7 @@
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
@@ -63,6 +68,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -113,9 +119,13 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
private Map highlights = new HashMap<>();
+ /** Name of a highlight that indicates the last action taken by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_ACTION = "lastAction";
+ /** Name of a highlight that indicates the last confirmation detected by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_CONFIRMATION= "lastConfirmation";
+ /** Name of a highlight that indicates the last violation detected by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_VIOLATION= "lastViolation";
+ /** Name of a highlight that indicates the triggers for this adjunct. */
public static String HIGHLIGHT_NAME_TRIGGERS = "triggers";
public AbstractEntityAdjunct() {
@@ -137,10 +147,6 @@ public AbstractEntityAdjunct(@SuppressWarnings("rawtypes") Map properties) {
}
}
- protected void addHighlight(String name, HighlightTuple tuple) {
- highlights.put(name, tuple);
- }
-
/**
* @deprecated since 0.7.0; only used for legacy brooklyn types where constructor is called directly
*/
@@ -568,6 +574,93 @@ public Map getHighlights() {
return highlightsToReturn;
}
+ /** Records a named highlight against this object, for persistence and API access.
+ * See common highlights including {@link #HIGHLIGHT_NAME_LAST_ACTION} and
+ * {@link #HIGHLIGHT_NAME_LAST_CONFIRMATION}.
+ * Also see convenience methods eg {@link #highlightOngoing(String, String)} and {@link #highlight(String, String, Task)}
+ * and {@link HighlightTuple}.
+ */
+ protected void setHighlight(String name, HighlightTuple tuple) {
+ highlights.put(name, tuple);
+ }
+
+ /** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item which is intended to be ongoing. */
+ protected void highlightOngoing(String name, String description) {
+ setHighlight(name, new HighlightTuple(description, 0, null));
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item with the current time. */
+ protected void highlightNow(String name, String description) {
+ setHighlight(name, new HighlightTuple(description, System.currentTimeMillis(), null));
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item with the current time and given task. */
+ protected void highlight(String name, String description, @Nullable Task> t) {
+ setHighlight(name, new HighlightTuple(description, System.currentTimeMillis(), t!=null ? t.getId() : null));
+ }
+
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_TRIGGERS} (as ongoing). */
+ protected void highlightTriggers(String description) {
+ highlightOngoing(HIGHLIGHT_NAME_TRIGGERS, description);
+ }
+ /** As {@link #highlightTriggers(String)} but convenience to generate a message given a sensor and source (entity or string description) */
+ protected void highlightTriggers(Sensor> s, Object source) {
+ highlightTriggers(Collections.singleton(s), Collections.singleton(source));
+ }
+ /** As {@link #highlightTriggers(String)} but convenience to generate a message given a list of sensors and source (entity or string description) */
+ protected void highlightTriggers(Iterable extends Sensor extends T>> s, Object source) {
+ highlightTriggers(s, (Iterable>) (source instanceof Iterable ? (Iterable>)source : Collections.singleton(source)));
+ }
+ /** As {@link #highlightTriggers(String)} but convenience to generate a message given a sensor and list of sources (entity or string description) */
+ protected void highlightTriggers(Sensor> s, Iterable sources) {
+ highlightTriggers(Collections.singleton(s), sources);
+ }
+ /** As {@link #highlightTriggers(String)} but convenience to generate a message given a list of sensors and list of sources (entity or string description) */
+ protected void highlightTriggers(Iterable extends Sensor extends T>> sensors, Iterable sources) {
+ StringBuilder msg = new StringBuilder("Listening for ");
+
+ if (sensors==null || Iterables.isEmpty(sensors)) {
+ msg.append("");
+ } else {
+ String sensorsText = MutableSet.copyOf(sensors).stream()
+ .filter(s -> s != null)
+ .map(s -> (s instanceof Sensor ? ((Sensor>) s).getName() : s.toString()))
+ .collect(Collectors.joining(", "));
+ msg.append(sensorsText);
+ }
+
+ if (sources!=null && !Iterables.isEmpty(sources)) {
+ String sourcesText = MutableSet.copyOf(sources).stream()
+ .filter(s -> s != null)
+ .map(s -> (s.equals(getEntity()) ? "self" : s.toString()))
+ .collect(Collectors.joining(", "));
+ if (!"self".equals(sourcesText)) {
+ msg.append(" on ").append(sourcesText);
+ }
+ }
+
+ highlightTriggers(msg.toString());
+ }
+
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_CONFIRMATION}. */
+ protected void highlightConfirmation(String description) {
+ highlightNow(HIGHLIGHT_NAME_LAST_CONFIRMATION, description);
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION}. */
+ protected void highlightAction(String description, Task> t) {
+ highlight(HIGHLIGHT_NAME_LAST_ACTION, description, t);
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION} when publishing a sensor. */
+ protected void highlightActionPublishSensor(Sensor> s, Object v) {
+ highlightActionPublishSensor("Publish "+s.getName()+" "+v);
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION} when publishing a sensor (freeform text). */
+ protected void highlightActionPublishSensor(String description) {
+ highlight(HIGHLIGHT_NAME_LAST_ACTION, description, null);
+ }
+ /** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_VIOLATION}. */
+ protected void highlightViolation(String description) {
+ highlightNow(HIGHLIGHT_NAME_LAST_VIOLATION, description);
+ }
+
/**
* Should only be used for rebind
* @param highlights
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
index 92363e6b6f..9cb7b99555 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
@@ -20,6 +20,8 @@
import static com.google.common.base.Preconditions.checkState;
+import java.util.Collection;
+import java.util.List;
import java.util.Set;
import org.apache.brooklyn.api.entity.Entity;
@@ -33,6 +35,7 @@
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.trait.Changeable;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.StringPredicates;
@@ -103,24 +106,31 @@ public void setEntity(EntityLocal entity) {
checkState(fromHardcodedProducers != null ^ producer != null, "must specify one of %s (%s) or %s (%s)",
PRODUCER.getName(), producer, FROM_HARDCODED_PRODUCERS.getName(), fromHardcodedProducers);
+ List producers = MutableList.of();
+
if (fromHardcodedProducers != null) {
for (Entity producer : Iterables.filter(fromHardcodedProducers, entityFilter)) {
+ producers.add(producer);
addProducerHardcoded(producer);
}
}
if (isAggregatingMembers()) {
+ producers.add(0, "members");
setEntityBeforeSubscribingProducerMemberEvents(entity);
setEntitySubscribeProducerMemberEvents();
setEntityAfterSubscribingProducerMemberEvents();
}
if (isAggregatingChildren()) {
+ producers.add(0, "children");
setEntityBeforeSubscribingProducerChildrenEvents();
setEntitySubscribingProducerChildrenEvents();
setEntityAfterSubscribingProducerChildrenEvents();
}
+ highlightTriggers(getSourceSensors(), producers);
+
onUpdated();
}
@@ -137,6 +147,8 @@ protected void setEntityLoadingConfig() {
setEntityLoadingTargetConfig();
}
+ protected abstract Collection> getSourceSensors();
+
protected Predicate> getDefaultValueFilter() {
if (getConfig(EXCLUDE_BLANK))
return StringPredicates.isNonBlank();
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
index c0cd36d1dc..7de27ca886 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
@@ -47,8 +47,6 @@ public abstract class AbstractMultipleSensorAggregator extends AbstractAggreg
public AbstractMultipleSensorAggregator() { }
- protected abstract Collection> getSourceSensors();
-
@Override
protected void setEntityLoadingConfig() {
super.setEntityLoadingConfig();
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
index 35138afc22..e2e325c73c 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
@@ -31,6 +31,7 @@
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.slf4j.Logger;
@@ -138,6 +139,8 @@ public void setEntity(EntityLocal entity) {
subscriptions().subscribe(MutableMap.of("notifyOfInitialValue", true), producer, (Sensor>)sensor, triggerListener);
}
}
+
+ highlightTriggers(MutableList.>of(sourceSensor).appendAll(triggerSensors), producer);
}
/** returns a function for transformation, for immediate use only (not for caching, as it may change) */
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
index 4c5294d8dd..b59d998dc2 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
@@ -33,6 +33,7 @@
* @deprecated since 0.7.0; use {@link Enrichers.builder()}
*/
@Deprecated
+// TODO this has active subclasses so i don't think it should be deprecated; possibly protected was the intention?
public abstract class AbstractTypeTransformingEnricher extends AbstractEnricher implements SensorEventListener {
@SetFromFlag
@@ -66,5 +67,7 @@ public void setEntity(EntityLocal entity) {
if (value!=null)
onEvent(new BasicSensorEvent(source, producer, value, -1));
}
+
+ highlightTriggers(source, producer);
}
}
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
index f1a85fa82f..1797d0db6d 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
@@ -90,6 +90,11 @@ public class Aggregator extends AbstractAggregator implements SensorEv
public Aggregator() {}
+ @Override
+ protected Collection> getSourceSensors() {
+ return Collections.singleton(sourceSensor);
+ }
+
@Override
@SuppressWarnings("unchecked")
protected void setEntityLoadingConfig() {
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java
index 961e16a72b..e2ab06c9a4 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java
@@ -107,6 +107,8 @@ public void setEntity(EntityLocal entity) {
}
}
}
+
+ highlightTriggers(sourceSensors, producer);
}
@Override
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java
index 51f599eb31..0b562f4e6d 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java
@@ -93,6 +93,7 @@ public void setEntity(EntityLocal entity) {
this.sourceSensor = (AttributeSensor) getRequiredConfig(SOURCE_SENSOR);
this.targetSensor = (Sensor) getRequiredConfig(TARGET_SENSOR);
+ highlightTriggers(sourceSensor, producer);
subscriptions().subscribe(producer, sourceSensor, this);
Object value = producer.getAttribute((AttributeSensor>) sourceSensor);
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
index 21b7ffe27b..1446d8d1b7 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
@@ -162,10 +162,16 @@ public void setEntity(EntityLocal entity) {
if (propagatingAll) {
subscriptions().subscribe(producer, null, this);
+ highlightTriggers("Listening for all sensors on "+producer);
} else {
for (Sensor> sensor : sensorMapping.keySet()) {
subscriptions().subscribe(producer, sensor, this);
}
+ if (sensorMapping.keySet().size() > 3) {
+ highlightTriggers("Listening for "+sensorMapping.keySet()+" sensors on "+producer);
+ } else {
+ highlightTriggers(sensorMapping.keySet(), producer);
+ }
}
emitAllAttributes();
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
index acd4dbdebf..f304bdc855 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
@@ -133,6 +133,8 @@ public void setEntity(EntityLocal entity) {
this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL);
subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), producer, sourceSensor, this);
+
+ highlightTriggers(sourceSensor, producer);
}
@Override
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
index c24869d943..f9b9950a55 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
@@ -113,6 +113,7 @@ public void setEntity(EntityLocal entity) {
}
subscribedSensors = ImmutableList.copyOf(sensorListTemp);
+ highlightTriggers(subscribedSensors, producer);
}
// Default implementation, subclasses should override
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
index d60ec84ccb..d61944c867 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
@@ -33,9 +33,11 @@
import org.apache.brooklyn.core.BrooklynLogging;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,6 +204,7 @@ protected void subscribeToGroup(final Group group) {
}
});
}
+ highlightTriggers(getSensorsToTrack(), "group members");
// The policy will have already fired events for its members.
if (!isRebinding()) {
@@ -216,10 +219,28 @@ protected void unsubscribeFromGroup() {
if (getSubscriptionTracker() != null && group != null) subscriptions().unsubscribe(group);
}
+ /** Invoked by framework prior to all entity events, to provide default highlight info;
+ * if subclasses provide their own they can overwrite this method to be no-op,
+ * or they can change the message passed to {@link #defaultHighlightAction(EventType, Entity, String)}
+ * which defaults to "Update on %s %s" */
+ // bit of a cheat but more informative than doing nothing; callers can override of course
+ protected void defaultHighlightAction(EventType type, Entity entity) {
+ defaultHighlightAction(type, entity, "Update on %s %s");
+ }
+ /** Records an {@link AbstractEntityAdjunct#HIGHLIGHT_NAME_LAST_ACTION} with the given message,
+ * formatted with arguments entity and either 'added', 'changed', or 'removed'.
+ * Can be overridden to be no-op if caller wants to manage their own such highlights,
+ * or to provide further information. See also {@link #defaultHighlightAction(EventType, Entity)}. */
+ protected void defaultHighlightAction(EventType type, Entity entity, String formattedMessage) {
+ highlightAction(String.format(formattedMessage, entity, Strings.removeFromStart(type.toString().toLowerCase(), "entity_")), null);
+ }
+
/** All entity events pass through this method. Default impl delegates to onEntityXxxx methods, whose default behaviours are no-op.
* Callers may override this to intercept all entity events in a single place, and to suppress subsequent processing if desired.
*/
- protected void onEntityEvent(EventType type, Entity entity) {
+ protected void onEntityEvent(EventType type, Entity entity) {
+ defaultHighlightAction(type, entity);
+
switch (type) {
case ENTITY_CHANGE: onEntityChange(entity); break;
case ENTITY_ADDED: onEntityAdded(entity); break;
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
index 32ea94c56e..ea2ec5541f 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
@@ -24,6 +24,7 @@
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.MapConfigKey;
@@ -143,8 +144,9 @@ protected void onEntityEvent(EventType type, Entity member) {
execute(member, command, type.name(), member.getId());
break;
case ALL_MEMBERS:
+ highlightAction("Run at all members ("+getGroup().getMembers().size()+")", null);
for (Entity each : getGroup().getMembers()) {
- execute(each, command, type.name(), member.getId());
+ execute(each, command, type.name(), member.getId(), false);
}
break;
default:
@@ -153,8 +155,11 @@ protected void onEntityEvent(EventType type, Entity member) {
}
}
- @SuppressWarnings("unchecked")
public void execute(Entity target, String command, String type, String memberId) {
+ execute(target, command, type, memberId, true);
+ }
+ @SuppressWarnings("unchecked")
+ private void execute(Entity target, String command, String type, String memberId, boolean highlight) {
if (Entities.isNoLongerManaged(target)) return;
Lifecycle state = target.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
if (state==Lifecycle.STOPPING || state==Lifecycle.STOPPED) return;
@@ -194,7 +199,11 @@ public void execute(Entity target, String command, String type, String memberId)
.summary("group-" + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_HYPHEN, type))
.environmentVariables(serializer.serialize(env));
- String output = DynamicTasks.submit(task.newTask(), target).getUnchecked();
+ Task taskI = DynamicTasks.submit(task.newTask(), target);
+ if (highlight) {
+ highlightAction("Run at "+machine.get().getAddress().getHostAddress(), taskI);
+ }
+ String output = taskI.getUnchecked();
LOG.trace("Command returned: {}", output);
}
}
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/BasicPolicyTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/BasicPolicyTest.java
index 4da4188730..69c7de0b1b 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/BasicPolicyTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/BasicPolicyTest.java
@@ -34,6 +34,7 @@
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.testng.annotations.Test;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
@@ -71,10 +72,10 @@ protected void doReconfigureConfig(ConfigKey key, T val) {
}
}
- //make visable for testing
+ @VisibleForTesting
@Override
- protected void addHighlight(String name, HighlightTuple tuple) {
- super.addHighlight(name, tuple);
+ protected void setHighlight(String name, HighlightTuple tuple) {
+ super.setHighlight(name, tuple);
}
}
@@ -117,7 +118,7 @@ public void testHighlights() throws Exception {
MyPolicy policy = new MyPolicy();
HighlightTuple highlight = new HighlightTuple("TEST_DESCRIPTION", 123L, "456");
- policy.addHighlight("testHighlightName", highlight);
+ policy.setHighlight("testHighlightName", highlight);
Map highlights = policy.getHighlights();
diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/TransformingEnricherTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/TransformingEnricherTest.java
index 1be9a366c1..b38bd927b7 100644
--- a/core/src/test/java/org/apache/brooklyn/enricher/stock/TransformingEnricherTest.java
+++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/TransformingEnricherTest.java
@@ -75,9 +75,10 @@ public void testTransformingEnricher() throws Exception {
producer.enrichers().add(Enrichers.builder()
.transforming(intSensorA)
- //.computing(MathFunctions.times(2)) // TODO calling it before "publishing" means it doesn't check return type!
+ .computing(MathFunctions.times(2))
.publishing(target)
- .computing((Function)MathFunctions.times(2)) // TODO doesn't match strongly typed int->long
+ // note: if `computing` comes later, we lose some type inference, have to give explicit types or go unchecked
+ //.computing((Function)MathFunctions.times(2))
.build());
EntityAsserts.assertAttributeEqualsEventually(producer, target, 6L);
@@ -157,7 +158,7 @@ public void testTriggeringSensorNamesResolvedFromStrings() throws Exception {
// Doing nasty casting here, but in YAML we could easily get passed this.
producer.enrichers().add(EnricherSpec.create(Transformer.class)
.configure(Transformer.TARGET_SENSOR, target)
- .configure(Transformer.TRIGGER_SENSORS, (List>)(List)ImmutableList.of(intSensorA.getName(), intSensorB.getName()))
+ .configure(Transformer.TRIGGER_SENSORS, (List>)(List>)ImmutableList.of(intSensorA.getName(), intSensorB.getName()))
.configure(Transformer.PRODUCER, producer)
.configure(Transformer.TARGET_VALUE, new DeferredSupplier() {
@Override public Long get() {
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index a09c39d943..0ce3f9c780 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -32,6 +32,8 @@
import org.apache.brooklyn.api.catalog.Catalog;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.objs.HighlightTuple;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.Sensor;
@@ -52,6 +54,7 @@
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -470,13 +473,24 @@ public void onEvent(SensorEvent event) {
Sensor sensor = event.getSensor();
Preconditions.checkArgument(sensor.equals(DynamicCluster.GROUP_SIZE), "Unexpected sensor " + sensor);
Integer size = event.getValue();
+ String targetRange = getMinPoolSize()+" - "+getMaxPoolSize();
if (size > getMaxPoolSize()) {
- scheduleResize(getMaxPoolSize());
+ highlightViolation("Size "+size+" too large (target "+targetRange+")");
+ scheduleResize(getMaxPoolSize(), decap(getHighlights().get(HIGHLIGHT_NAME_LAST_VIOLATION)));
} else if (size < getMinPoolSize()) {
- scheduleResize(getMinPoolSize());
+ highlightViolation("Size "+size+" too small (target "+targetRange+")");
+ scheduleResize(getMinPoolSize(), decap(getHighlights().get(HIGHLIGHT_NAME_LAST_VIOLATION)));
+ } else {
+ highlightConfirmation("Size "+size+" in target range "+targetRange);
}
}
};
+
+ private String decap(HighlightTuple t) {
+ String msg = t==null ? null : t.getDescription();
+ if (Strings.isBlank(msg)) return null;
+ return Character.toLowerCase(msg.charAt(0))+msg.substring(1);
+ }
public AutoScalerPolicy() {
}
@@ -703,6 +717,9 @@ public void setEntity(EntityLocal entity) {
if (getMetric() != null) {
Entity entityToSubscribeTo = (getEntityWithMetric() != null) ? getEntityWithMetric() : entity;
subscriptions().subscribe(entityToSubscribeTo, getMetric(), metricEventHandler);
+ highlightTriggers(getMetric(), entityToSubscribeTo);
+ } else {
+ highlightTriggers("Listening for standard size and pool hot/cold sensors (no specific metric)");
}
subscriptions().subscribe(poolEntity, getPoolColdSensor(), utilizationEventHandler);
subscriptions().subscribe(poolEntity, getPoolHotSensor(), utilizationEventHandler);
@@ -831,15 +848,21 @@ private void analyze(ScalingData data, String description) {
*/
if (data.isHot()) {
// scale out
+ highlightViolation("Metric "+String.format("%.02f", data.currentMetricValue)+" too hot "
+ + "(target range "+String.format("%.02f", data.metricLowerBound)+"-"+String.format("%.02f", data.metricUpperBound)+")");
desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound);
data.scalingMode = ScalingType.HOT;
} else if (data.isCold()) {
// scale back
+ highlightViolation("Metric "+String.format("%.02f", data.currentMetricValue)+" too cold "
+ + "(target range "+String.format("%.02f", data.metricLowerBound)+"-"+String.format("%.02f", data.metricUpperBound)+")");
desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound);
data.scalingMode = ScalingType.COLD;
} else {
+ highlightConfirmation("Metric "+String.format("%.02f", data.currentMetricValue)+" in "
+ + "target range "+String.format("%.02f", data.metricLowerBound)+"-"+String.format("%.02f", data.metricUpperBound));
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound});
abortResize(data.currentSize);
return; // within the healthy range; no-op
@@ -893,14 +916,14 @@ private void analyze(ScalingData data, String description) {
if (data.scalingMode!=null) {
if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained});
- scheduleResize(desiredSize);
+ scheduleResize(desiredSize, "metric "+data.currentMetricValue+" out of range");
} else {
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained});
abortResize(data.currentSize);
// but add to the unbounded record for future consideration
}
- onNewUnboundedPoolSize(desiredSizeUnconstrained);
+ onNewUnboundedPoolSize(desiredSizeUnconstrained, "ideal unconstrained size is "+desiredSizeUnconstrained);
}
private int applyMinMaxConstraints(long desiredSize) {
@@ -931,10 +954,10 @@ private void onPoolOk(Map properties) {
* executes, it will resize to whatever the latest value is to be (rather than what it was told
* to do at the point the job was queued).
*/
- private void scheduleResize(final int newSize) {
+ private void scheduleResize(final int newSize, String reason) {
recentDesiredResizes.add(newSize);
- scheduleResize();
+ scheduleResize(reason);
}
/**
@@ -944,10 +967,10 @@ private void scheduleResize(final int newSize) {
* Piggy-backs off the existing scheduleResize execution, which now also checks if the listener
* needs to be called.
*/
- private void onNewUnboundedPoolSize(final int val) {
+ private void onNewUnboundedPoolSize(final int val, String reason) {
if (getMaxSizeReachedSensor() != null) {
recentUnboundedResizes.add(val);
- scheduleResize();
+ scheduleResize(reason);
}
}
@@ -966,7 +989,7 @@ private boolean isEntityUp() {
}
}
- private void scheduleResize() {
+ private void scheduleResize(String reason) {
// TODO Make scale-out calls concurrent, rather than waiting for first resize to entirely
// finish. On ec2 for example, this can cause us to grow very slowly if first request is for
// just one new VM to be provisioned.
@@ -982,8 +1005,8 @@ private void scheduleResize() {
executorTime = System.currentTimeMillis();
executorQueued.set(false);
- resizeNow();
- notifyMaxReachedIfRequiredNow();
+ resizeNow(reason);
+ notifyMaxReachedIfRequiredNow(reason);
} catch (Exception e) {
if (isRunning()) {
@@ -1006,8 +1029,9 @@ private void scheduleResize() {
* those values have been within a time window. The time window used is the "maxReachedNotificationDelay",
* which determines how many milliseconds after being consistently above the max-size will it take before
* we emit the sensor event (if any).
+ * @param reason
*/
- private void notifyMaxReachedIfRequiredNow() {
+ private void notifyMaxReachedIfRequiredNow(String reason) {
BasicNotificationSensor super MaxPoolSizeReachedEvent> maxSizeReachedSensor = getMaxSizeReachedSensor();
if (maxSizeReachedSensor == null) {
return;
@@ -1044,14 +1068,14 @@ private void notifyMaxReachedIfRequiredNow() {
// TODO Could check if there has been anything bigger than "min" since min happened (would be more efficient)
if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling max-reached check for {}, as unbounded size not stable (min {}, max {}, latest {})",
new Object[] {this, poolEntity, valsSummary.min, valsSummary.max, valsSummary.latest});
- scheduleResize();
+ scheduleResize(reason);
} else {
// nothing to write home about; continually below maxAllowed
}
}
- private void resizeNow() {
+ private void resizeNow(String reason) {
final int currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
long desiredPoolSize = calculatedDesiredPoolSize.size;
@@ -1065,7 +1089,7 @@ private void resizeNow() {
// (note we continue now with as "good" a resize as we can given the instability)
if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...",
new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
- scheduleResize();
+ scheduleResize(reason);
}
if (currentPoolSize == targetPoolSize) {
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}",
@@ -1076,7 +1100,7 @@ private void resizeNow() {
if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}",
new Object[] {this, targetPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
- Entities.submit(entity, Tasks.builder().displayName("Auto-scaler")
+ Task t = Entities.submit(entity, Tasks.builder().displayName("Auto-scaler")
.description("Auto-scaler recommending resize from "+currentPoolSize+" to "+targetPoolSize)
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.body(new Callable() {
@@ -1095,8 +1119,10 @@ public Void call() throws Exception {
}
return null;
}
- }).build())
- .blockUntilEnded();
+ }).build());
+ highlightAction("Resize from "+currentPoolSize+" to "+targetPoolSize+
+ (reason!=null ? " because "+reason : ""), t);
+ t.blockUntilEnded();
}
/**
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
index 4cc05412d3..583c0ea8bd 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -237,7 +237,7 @@ protected void setActualState(Maybe state) {
publishEntityFailedTime = now + republishDelay.toMilliseconds();
recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
}
- entity.sensors().emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
+ emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
config().set(LAST_PUBLISHED, LastPublished.FAILED);
} else {
recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
@@ -248,7 +248,7 @@ protected void setActualState(Maybe state) {
if (LOG.isDebugEnabled()) LOG.debug("{} publishing recovered (state={}; currentRecoveryStartTime={}; now={}",
new Object[] {this, state, Time.makeDateString(currentRecoveryStartTime), Time.makeDateString(now)});
publishEntityRecoveredTime = null;
- entity.sensors().emit(HASensors.ENTITY_RECOVERED, new HASensors.FailureDescriptor(entity, null));
+ emit(HASensors.ENTITY_RECOVERED, new HASensors.FailureDescriptor(entity, null));
config().set(LAST_PUBLISHED, LastPublished.RECOVERED);
} else {
recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
index 9173895236..d15b122b0d 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -29,6 +29,7 @@
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
@@ -116,6 +117,7 @@ public void setEntity(final EntityLocal entity) {
// for them; or could write events to a blocking queue and have onDetectedFailure read from that.
if (isRunning()) {
+ highlightViolation("Failure detected");
LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
@Override public void run() {
@@ -126,25 +128,45 @@ public void setEntity(final EntityLocal entity) {
}
}
});
+ highlightTriggers(failureSensorToMonitor, "members");
}
// TODO semaphores would be better to allow at-most-one-blocking behaviour
protected synchronized void onDetectedFailure(SensorEvent event) {
final Entity failedEntity = event.getSource();
final Object reason = event.getValue();
+ String violationText = "Failure detected at "+failedEntity+(reason!=null ? " ("+reason+")" : "");
if (isSuspended()) {
+ highlightViolation(violationText+" but policy is suspended");
LOG.warn("ServiceReplacer suspended, so not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
return;
}
- if (isRepeatedlyFailingTooMuch()) {
+
+ Integer failOnNumRecurringFailures = getConfig(FAIL_ON_NUM_RECURRING_FAILURES);
+ long failOnRecurringFailuresInThisDuration = getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION);
+ long oldestPermitted = currentTimeMillis() - failOnRecurringFailuresInThisDuration;
+ // trim old ones
+ for (Iterator iter = consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) {
+ Long timestamp = iter.next();
+ if (timestamp < oldestPermitted) {
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+
+ if (consecutiveReplacementFailureTimes.size() >= failOnNumRecurringFailures) {
+ highlightViolation(violationText+" but too many recent failures detected: "
+ + consecutiveReplacementFailureTimes.size()+" in "+failOnRecurringFailuresInThisDuration+" exceeds limit of "+failOnNumRecurringFailures);
LOG.error("ServiceReplacer not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+"), because too many recent replacement failures");
return;
}
+ highlightViolation(violationText+", triggering restart");
LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
- ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
+ Task> t = ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
@Override
public void run() {
@@ -156,28 +178,12 @@ public void run() {
LOG.info("ServiceReplacer: ignoring error reported from stopping failed node "+failedEntity);
return;
}
+ highlightViolation(violationText+" and replace attempt failed: "+Exceptions.collapseText(e));
onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
}
}
});
- }
-
- private boolean isRepeatedlyFailingTooMuch() {
- Integer failOnNumRecurringFailures = getConfig(FAIL_ON_NUM_RECURRING_FAILURES);
- long failOnRecurringFailuresInThisDuration = getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION);
- long oldestPermitted = currentTimeMillis() - failOnRecurringFailuresInThisDuration;
-
- // trim old ones
- for (Iterator iter = consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) {
- Long timestamp = iter.next();
- if (timestamp < oldestPermitted) {
- iter.remove();
- } else {
- break;
- }
- }
-
- return (consecutiveReplacementFailureTimes.size() >= failOnNumRecurringFailures);
+ highlightAction("Replacing "+failedEntity, t);
}
protected long currentTimeMillis() {
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
index 03cae2e3f1..09837d8225 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -22,6 +22,7 @@
import org.apache.brooklyn.api.catalog.Catalog;
import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
@@ -110,6 +111,7 @@ public void setEntity(final EntityLocal entity) {
}
}
});
+ highlightTriggers(getConfig(FAILURE_SENSOR_TO_MONITOR), entity);
}
// TODO semaphores would be better to allow at-most-one-blocking behaviour
@@ -117,6 +119,7 @@ public void setEntity(final EntityLocal entity) {
// (as has been done in ServiceReplacer)
protected synchronized void onDetectedFailure(SensorEvent event) {
if (isSuspended()) {
+ highlightViolation("Failure detected but policy suspended");
LOG.warn("ServiceRestarter suspended, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
return;
}
@@ -126,12 +129,16 @@ protected synchronized void onDetectedFailure(SensorEvent event) {
Long last = lastFailureTime.getAndSet(current);
long elapsed = last==null ? -1 : current-last;
if (elapsed>=0 && elapsed <= getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION).toMilliseconds()) {
+ highlightViolation("Failure detected but policy ran "+Duration.millis(elapsed)+" ago (cannot run again within "+getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION)+")");
onRestartFailed("Restart failure (failed again after "+Time.makeTimeStringRounded(elapsed)+") at "+entity+": "+event.getValue());
return;
}
try {
+ highlightViolation("Failure detected and restart triggered");
ServiceStateLogic.setExpectedState(entity, Lifecycle.STARTING);
- Entities.invokeEffector(entity, entity, Startable.RESTART).get();
+ Task t = Entities.invokeEffector(entity, entity, Startable.RESTART);
+ highlightAction("Restart node on failure", t);
+ t.get();
} catch (Exception e) {
onRestartFailed("Restart failure (error "+e+") at "+entity+": "+event.getValue());
}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyRebindTest.java
index 10e67e998d..bf7d150d92 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyRebindTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyRebindTest.java
@@ -40,6 +40,7 @@
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -110,7 +111,9 @@ public void testRestoresAutoScalerConfig() throws Exception {
assertEquals(newPolicy.getConfig(AutoScalerPolicy.POOL_OK_SENSOR), POOL_OK_SENSOR);
assertEquals(newPolicy.getConfig(AutoScalerPolicy.MAX_SIZE_REACHED_SENSOR), MAX_SIZE_REACHED_SENSOR);
assertEquals(newPolicy.getConfig(AutoScalerPolicy.MAX_REACHED_NOTIFICATION_DELAY), Duration.of(7, TimeUnit.MILLISECONDS));
- assertTrue(newPolicy.getHighlights().isEmpty());
+ Asserts.assertSize(newPolicy.getHighlights().keySet(), 1);
+ assertEquals(newPolicy.getHighlights().get(AbstractEntityAdjunct.HIGHLIGHT_NAME_TRIGGERS).getDescription(),
+ "Listening for "+METRIC_SENSOR.getName());
}
@Test
diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/mocks/RestMockSimplePolicy.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/mocks/RestMockSimplePolicy.java
index 9f142c3493..09a06c5228 100644
--- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/mocks/RestMockSimplePolicy.java
+++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/mocks/RestMockSimplePolicy.java
@@ -33,8 +33,8 @@ public class RestMockSimplePolicy extends AbstractPolicy {
public RestMockSimplePolicy() {
super();
- this.addHighlight("testNameTask", new HighlightTuple("testDescription", 123L, "testTaskId"));
- this.addHighlight("testNameNoTask", new HighlightTuple("testDescription", 123L, null));
+ this.setHighlight("testNameTask", new HighlightTuple("testDescription", 123L, "testTaskId"));
+ this.setHighlight("testNameNoTask", new HighlightTuple("testDescription", 123L, null));
}
@SetFromFlag("sampleConfig")
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java
index e4f82ced61..3bb8a50043 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/pool/ServerPoolImpl.java
@@ -432,6 +432,7 @@ public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolic
protected void onEntityEvent(EventType type, Entity member) {
Boolean isUp = member.getAttribute(Attributes.SERVICE_UP);
LOG.info("{} in {}: {} service up is {}", new Object[]{type.name(), entity, member, isUp});
+ defaultHighlightAction(type, entity, "Update on %s %s (service "+(isUp==Boolean.TRUE ? "up" : isUp==Boolean.FALSE ? "not up" : "up value not known")+")");
if (type.equals(EventType.ENTITY_ADDED) || type.equals(EventType.ENTITY_CHANGE)) {
if (Boolean.TRUE.equals(isUp)) {
((ServerPoolImpl) entity).serverAdded(member);
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
index ea31359d26..23992874ee 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
@@ -49,6 +49,7 @@
import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.location.jclouds.networking.NetworkingEffectors;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
@@ -221,11 +222,13 @@ public void populateServiceNotUpDiagnostics() {
public static class UpdatingNotUpFromServiceProcessIsRunning extends AbstractEnricher implements SensorEventListener {
public UpdatingNotUpFromServiceProcessIsRunning() {}
+ @SuppressWarnings("unchecked")
@Override
public void setEntity(EntityLocal entity) {
super.setEntity(entity);
subscriptions().subscribe(entity, SERVICE_PROCESS_IS_RUNNING, this);
subscriptions().subscribe(entity, Attributes.SERVICE_UP, this);
+ highlightTriggers(MutableList.of(SERVICE_PROCESS_IS_RUNNING, Attributes.SERVICE_UP), entity);
onUpdated();
}
diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
index 20a0ac2e35..2846625e1a 100644
--- a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
@@ -69,7 +69,7 @@ public void setUp() throws Exception {
super.setUp();
// because the effector calls wait for a state change, use a separate thread to drive that
- poller = new Poller(app, false);
+ poller = new Poller(app, null, false);
poller.scheduleAtFixedRate(
new Callable() {
@Override