Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions api/src/main/java/org/apache/brooklyn/api/objs/HighlightTuple.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@
*/
package org.apache.brooklyn.api.objs;

import javax.annotation.Nullable;

import com.google.common.annotations.Beta;

/** A record of a highlight e.g. attached to an {@link EntityAdjunct}.
* <p>
* Time of 0 or negative indicates the highlight is ongoing. Task ID is optional. */
@Beta
public class HighlightTuple {

private String description;
private long time;
private String taskId;

//required for JSON de-serialisation
private HighlightTuple(){

}
@SuppressWarnings("unused") //required for JSON de-serialisation
private HighlightTuple(){}

public HighlightTuple(String description, long time, String taskId) {
public HighlightTuple(String description, long time, @Nullable String taskId) {
this.description = description;
this.time = time;
this.taskId = taskId;
Expand All @@ -51,7 +57,7 @@ public void setTime(long time) {
this.time = time;
}

public String getTaskId() {
@Nullable public String getTaskId() {
return taskId;
}

Expand All @@ -78,4 +84,9 @@ public int hashCode() {
result = 31 * result + (taskId != null ? taskId.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "highlight["+description+"; time="+System.currentTimeMillis()+(taskId==null ? "" : "; task="+taskId)+"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class AbstractEnricher extends AbstractEntityAdjunct implements
"Whether duplicate values published by this enricher should be suppressed");

private static class DeduplicatingAttributeModifier<T> implements Function<T, Maybe<T>> {
public static <T> Function<T, Maybe<T>> create(T newVal) {
public static <T> DeduplicatingAttributeModifier<T> create(T newVal) {
return new DeduplicatingAttributeModifier<T>(newVal);
}

Expand All @@ -60,15 +60,20 @@ private DeduplicatingAttributeModifier(T newVal) {
}

private T newValue;
private Maybe<T> lastValue;

@Override
public Maybe<T> apply(T oldValue) {
if (Objects.equal(oldValue, newValue)) {
return Maybe.absent("Skipping update, values equal");
return lastValue = Maybe.absent("Skipping update, values equal");
} else {
return Maybe.of(newValue);
return lastValue = Maybe.of(newValue);
}
}

public Maybe<T> getLastValue() {
return lastValue;
}
}

private final EnricherDynamicType enricherType;
Expand Down Expand Up @@ -129,16 +134,22 @@ protected <T> void emit(Sensor<T> sensor, Object val) {
}

T newVal = TypeCoercions.coerce(val, sensor.getTypeToken());
Maybe<T> published = Maybe.of(newVal);
if (sensor instanceof AttributeSensor) {
AttributeSensor<T> attribute = (AttributeSensor<T>)sensor;
if (Boolean.TRUE.equals(suppressDuplicates)) {
entity.sensors().modify(attribute, DeduplicatingAttributeModifier.create(newVal));
DeduplicatingAttributeModifier<T> modifier = DeduplicatingAttributeModifier.create(newVal);
entity.sensors().modify(attribute, modifier);
published = modifier.getLastValue();
} else {
entity.sensors().set(attribute, newVal);
}
} else {
entity.sensors().emit(sensor, newVal);
}
if (published!=null && published.isPresent()) {
highlightActionPublishSensor(sensor, published.get());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public void init() {
if (uniqueTag==null) uniqueTag = DEFAULT_ENRICHER_UNIQUE_TAG;
}

@SuppressWarnings("unchecked")
@Override
public void setEntity(EntityLocal entity) {
super.setEntity(entity);
Expand All @@ -297,6 +298,8 @@ public void setEntity(EntityLocal entity) {
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_PROBLEMS, this);
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_UP, this);
subscriptions().subscribe(notifyOfInitialValue, entity, SERVICE_STATE_EXPECTED, this);

highlightTriggers(MutableList.of(SERVICE_PROBLEMS, SERVICE_UP, SERVICE_STATE_EXPECTED), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.ConfigKeys;
Expand All @@ -32,6 +33,7 @@
import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,7 +103,7 @@ public void start() {
throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
}

poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP));
poller = new Poller<Object>(entity, this, getConfig(ONLY_IF_SERVICE_UP));
activated = true;
preStart();
synchronized (pollerStateMutex) {
Expand Down Expand Up @@ -225,4 +227,16 @@ protected void postStop() {
protected Poller<?> getPoller() {
return poller;
}

void highlightTriggerPeriod(Duration minPeriod) {
highlightTriggers("Running every "+minPeriod);
}

void onRemoveSensor(Sensor<?> sensor) {
highlightActionPublishSensor("Clear sensor "+sensor.getName());
}

void onPublishSensor(Sensor<?> sensor, Object v) {
highlightActionPublishSensor(sensor, v);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ protected void setSensor(Object v) {
// nothing
} else if (v == FeedConfig.REMOVE) {
((EntityInternal)entity).removeAttribute(sensor);
feed.onRemoveSensor(sensor);
} else if (sensor == FeedConfig.NO_SENSOR) {
// nothing
} else {
Expand All @@ -227,6 +228,7 @@ protected void setSensor(Object v) {
// no change; nothing
} else {
entity.sensors().set(sensor, coercedV);
feed.onPublishSensor(sensor, coercedV);
}
}
}
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class Poller<V> {
public static final Logger log = LoggerFactory.getLogger(Poller.class);

private final Entity entity;
private final AbstractFeed feed;
private final boolean onlyIfServiceUp;
private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>();
private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
Expand Down Expand Up @@ -92,14 +93,15 @@ public void run() {
};
}
}
/** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */

/** @deprecated since 0.12.0 pass in feed */
@Deprecated
public Poller(Entity entity) {
this(entity, false);
}
public Poller(Entity entity, boolean onlyIfServiceUp) {
this(entity, null, onlyIfServiceUp);
}
public Poller(Entity entity, AbstractFeed feed, boolean onlyIfServiceUp) {
this.entity = entity;
this.feed = feed;
this.onlyIfServiceUp = onlyIfServiceUp;
}

Expand Down Expand Up @@ -140,6 +142,7 @@ public void start() {
oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
}

Duration minPeriod = null;
for (final PollJob<V> pollJob : pollJobs) {
final String scheduleName = pollJob.handler.getDescription();
if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
Expand All @@ -166,10 +169,17 @@ public Task<?> call() {
.period(pollJob.pollPeriod)
.cancelOnException(false);
tasks.add(Entities.submit(entity, task));
if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
minPeriod = pollJob.pollPeriod;
}
} else {
if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
}
}

if (minPeriod!=null && feed!=null) {
feed.highlightTriggerPeriod(minPeriod);
}
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.Configurable;
import org.apache.brooklyn.api.objs.EntityAdjunct;
Expand All @@ -42,7 +47,6 @@
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.BasicConfigKey;
import org.apache.brooklyn.core.config.ConfigConstraints;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.internal.AbstractConfigMapImpl;
Expand All @@ -51,6 +55,7 @@
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
Expand All @@ -63,6 +68,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;


Expand Down Expand Up @@ -113,9 +119,13 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple

private Map<String, HighlightTuple> highlights = new HashMap<>();

/** Name of a highlight that indicates the last action taken by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_ACTION = "lastAction";
/** Name of a highlight that indicates the last confirmation detected by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_CONFIRMATION= "lastConfirmation";
/** Name of a highlight that indicates the last violation detected by this adjunct. */
public static String HIGHLIGHT_NAME_LAST_VIOLATION= "lastViolation";
/** Name of a highlight that indicates the triggers for this adjunct. */
public static String HIGHLIGHT_NAME_TRIGGERS = "triggers";

public AbstractEntityAdjunct() {
Expand All @@ -137,10 +147,6 @@ public AbstractEntityAdjunct(@SuppressWarnings("rawtypes") Map properties) {
}
}

protected void addHighlight(String name, HighlightTuple tuple) {
highlights.put(name, tuple);
}

/**
* @deprecated since 0.7.0; only used for legacy brooklyn types where constructor is called directly
*/
Expand Down Expand Up @@ -568,6 +574,93 @@ public Map<String, HighlightTuple> getHighlights() {
return highlightsToReturn;
}

/** Records a named highlight against this object, for persistence and API access.
* See common highlights including {@link #HIGHLIGHT_NAME_LAST_ACTION} and
* {@link #HIGHLIGHT_NAME_LAST_CONFIRMATION}.
* Also see convenience methods eg {@link #highlightOngoing(String, String)} and {@link #highlight(String, String, Task)}
* and {@link HighlightTuple}.
*/
protected void setHighlight(String name, HighlightTuple tuple) {
highlights.put(name, tuple);
}

/** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item which is intended to be ongoing. */
protected void highlightOngoing(String name, String description) {
setHighlight(name, new HighlightTuple(description, 0, null));
}
/** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item with the current time. */
protected void highlightNow(String name, String description) {
setHighlight(name, new HighlightTuple(description, System.currentTimeMillis(), null));
}
/** As {@link #setHighlight(String, HighlightTuple)}, convenience for recording an item with the current time and given task. */
protected void highlight(String name, String description, @Nullable Task<?> t) {
setHighlight(name, new HighlightTuple(description, System.currentTimeMillis(), t!=null ? t.getId() : null));
}

/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_TRIGGERS} (as ongoing). */
protected void highlightTriggers(String description) {
highlightOngoing(HIGHLIGHT_NAME_TRIGGERS, description);
}
/** As {@link #highlightTriggers(String)} but convenience to generate a message given a sensor and source (entity or string description) */
protected <T> void highlightTriggers(Sensor<?> s, Object source) {
highlightTriggers(Collections.singleton(s), Collections.singleton(source));
}
/** As {@link #highlightTriggers(String)} but convenience to generate a message given a list of sensors and source (entity or string description) */
protected <T> void highlightTriggers(Iterable<? extends Sensor<? extends T>> s, Object source) {
highlightTriggers(s, (Iterable<?>) (source instanceof Iterable ? (Iterable<?>)source : Collections.singleton(source)));
}
/** As {@link #highlightTriggers(String)} but convenience to generate a message given a sensor and list of sources (entity or string description) */
protected <U> void highlightTriggers(Sensor<?> s, Iterable<U> sources) {
highlightTriggers(Collections.singleton(s), sources);
}
/** As {@link #highlightTriggers(String)} but convenience to generate a message given a list of sensors and list of sources (entity or string description) */
protected <T,U> void highlightTriggers(Iterable<? extends Sensor<? extends T>> sensors, Iterable<U> sources) {
StringBuilder msg = new StringBuilder("Listening for ");

if (sensors==null || Iterables.isEmpty(sensors)) {
msg.append("<nothing>");
} else {
String sensorsText = MutableSet.<Object>copyOf(sensors).stream()
.filter(s -> s != null)
.map(s -> (s instanceof Sensor ? ((Sensor<?>) s).getName() : s.toString()))
.collect(Collectors.joining(", "));
msg.append(sensorsText);
}

if (sources!=null && !Iterables.isEmpty(sources)) {
String sourcesText = MutableSet.<Object>copyOf(sources).stream()
.filter(s -> s != null)
.map(s -> (s.equals(getEntity()) ? "self" : s.toString()))
.collect(Collectors.joining(", "));
if (!"self".equals(sourcesText)) {
msg.append(" on ").append(sourcesText);
}
}

highlightTriggers(msg.toString());
}

/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_CONFIRMATION}. */
protected void highlightConfirmation(String description) {
highlightNow(HIGHLIGHT_NAME_LAST_CONFIRMATION, description);
}
/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION}. */
protected void highlightAction(String description, Task<?> t) {
highlight(HIGHLIGHT_NAME_LAST_ACTION, description, t);
}
/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION} when publishing a sensor. */
protected void highlightActionPublishSensor(Sensor<?> s, Object v) {
highlightActionPublishSensor("Publish "+s.getName()+" "+v);
}
/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_ACTION} when publishing a sensor (freeform text). */
protected void highlightActionPublishSensor(String description) {
highlight(HIGHLIGHT_NAME_LAST_ACTION, description, null);
}
/** As {@link #setHighlight(String, HighlightTuple)} for {@link #HIGHLIGHT_NAME_LAST_VIOLATION}. */
protected void highlightViolation(String description) {
highlightNow(HIGHLIGHT_NAME_LAST_VIOLATION, description);
}

/**
* Should only be used for rebind
* @param highlights
Expand Down
Loading